Implement new streaming APIs This adds new functions that are used by coordinators and workers to negotiate an RPC stream. A stream is simply any response that requires multiple messages from the worker.
BugzId: 22729 Project: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/commit/8f2c2956 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/tree/8f2c2956 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/diff/8f2c2956 Branch: refs/heads/windsor-merge Commit: 8f2c2956a80afce73e4193819bf6c973376a28cd Parents: b28cbb7 Author: Paul J. Davis <[email protected]> Authored: Wed Aug 14 09:06:05 2013 -0500 Committer: Robert Newson <[email protected]> Committed: Wed Jul 23 17:58:04 2014 +0100 ---------------------------------------------------------------------- src/rexi.erl | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/8f2c2956/src/rexi.erl ---------------------------------------------------------------------- diff --git a/src/rexi.erl b/src/rexi.erl index d37d360..75bc9bf 100644 --- a/src/rexi.erl +++ b/src/rexi.erl @@ -15,6 +15,8 @@ -export([cast/2, cast/3, cast/4, kill/2]). -export([reply/1, sync_reply/1, sync_reply/2]). -export([async_server_call/2, async_server_call/3]). +-export([stream_init/0, stream_init/1]). +-export([stream_start/1, stream_cancel/1]). -export([stream/1, stream/2, stream/3, stream_ack/1, stream_ack/2]). -include_lib("rexi/include/rexi.hrl"). @@ -115,6 +117,50 @@ sync_reply(Reply, Timeout) -> timeout end. +%% @equiv stream_init(300000) +stream_init() -> + stream_init(300000). + +%% @doc Initialize an RPC stream that involves sending multiple +%% messages back to the coordinator. +%% +%% This should be called by rexi workers. It blocks until the +%% coordinator responds with whether this worker should proceed. +%% This function will either return with `ok` or call +%% `erlang:exit/1`. +-spec stream_init(pos_integer()) -> ok. +stream_init(Timeout) -> + case sync_reply(rexi_STREAM_INIT, Timeout) of + rexi_STREAM_START -> + ok; + rexi_STREAM_CANCEL -> + exit(normal); + timeout -> + exit(timeout); + Else -> + exit({invalid_stream_message, Else}) + end. + +%% @doc Start a worker stream +%% +%% If a coordinator wants to continue using a streaming worker it +%% should use this function to inform the worker to continue +%% sending messages. The `From` should be the value provided by +%% the worker in the rexi_STREAM_INIT message. +-spec stream_start({pid(), any()}) -> ok. +stream_start({Pid, _Tag}=From) when is_pid(Pid) -> + gen_server:reply(From, rexi_STREAM_START). + +%% @doc Cancel a worker stream +%% +%% If a coordinator decideds that a worker is not going to be part +%% of the response it should use this function to cancel the worker. +%% The `From` should be the value provided by the worker in the +%% rexi_STREAM_INIT message. +-spec stream_cancel({pid(), any()}) -> ok. +stream_cancel({Pid, _Tag}=From) when is_pid(Pid) -> + gen_server:reply(From, rexi_STREAM_CANCEL). + %% @equiv stream(Msg, 100, 300000) stream(Msg) -> stream(Msg, 10, 300000).
