Author: kocolosk
Date: Sat Jul 18 02:18:38 2009
New Revision: 795297
URL: http://svn.apache.org/viewvc?rev=795297&view=rev
Log:
listen for local update notifications when continuous=true
Modified:
couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl
Modified: couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl
URL:
http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl?rev=795297&r1=795296&r2=795297&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_changes_feed.erl Sat Jul 18 02:18:38
2009
@@ -1,12 +1,12 @@
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy
of
+% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
@@ -222,6 +222,18 @@
decode_row(Row) ->
?JSON_DECODE(Row).
+flush_updated_messages() ->
+ receive updated -> flush_updated_messages()
+ after 0 -> ok
+ end.
+
+local_update_notification(Self, DbName, {updated, DbName}) ->
+ Self ! updated;
+local_update_notification(Self, DbName, {updated, DbName}) ->
+ Self ! deleted;
+local_update_notification(_, _, _) ->
+ ok.
+
maybe_stream_next(false, Count, Id) when Count < ?MIN_BUFFER_SIZE ->
?LOG_DEBUG("~p reqid ~p streaming next chunk", [?MODULE, Id]),
ibrowse:stream_next(Id);
@@ -230,8 +242,11 @@
ok.
send_local_changes_forever(Server, DbName, Since) ->
+ Self = self(),
+ {ok, _} = couch_db_update_notifier:start_link(
+ fun(Msg) -> local_update_notification(Self, DbName, Msg) end),
{ok, NewSeq} = send_local_changes_once(Server, DbName, Since),
- timer:sleep(5000),
+ ok = wait_db_updated(),
send_local_changes_forever(Server, DbName, NewSeq).
send_local_changes_once(Server, DbName, Since) ->
@@ -268,3 +283,12 @@
{ibrowse_req_id, Id} =
ibrowse:send_req_direct(Pid, RawUrl, [], get, [], Opts, infinity),
{Pid, Id}.
+
+wait_db_updated() ->
+ receive deleted ->
+ exit(deleted)
+ after 0 ->
+ receive updated ->
+ flush_updated_messages()
+ end
+ end.