Thanks for fixing this, Filipe. I think we should revisit the decision to keep an open reference to the DB in couch_view_group after 1.1. I don't see any good reason for it; the updater and compactor could easily open the DB themselves. I think it would clean up a lot of very messy and error-prone code. Regards,
Adam On Nov 17, 2010, at 7:11 PM, [email protected] wrote: > Author: fdmanana > Date: Thu Nov 18 00:11:18 2010 > New Revision: 1036294 > > URL: http://svn.apache.org/viewvc?rev=1036294&view=rev > Log: > Make sure that after a database compaction the old database reference > counters don't get unreleased forever. > Closes COUCHDB-926. > > > Modified: > couchdb/trunk/src/couchdb/couch_auth_cache.erl > couchdb/trunk/src/couchdb/couch_db.erl > couchdb/trunk/src/couchdb/couch_db_updater.erl > couchdb/trunk/src/couchdb/couch_view_group.erl > > Modified: couchdb/trunk/src/couchdb/couch_auth_cache.erl > URL: > http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_auth_cache.erl?rev=1036294&r1=1036293&r2=1036294&view=diff > ============================================================================== > --- couchdb/trunk/src/couchdb/couch_auth_cache.erl (original) > +++ couchdb/trunk/src/couchdb/couch_auth_cache.erl Thu Nov 18 00:11:18 2010 > @@ -336,7 +336,7 @@ cache_needs_refresh() -> > > > reopen_auth_db(AuthDb) -> > - case (catch gen_server:call(AuthDb#db.main_pid, get_db, infinity)) of > + case (catch couch_db:reopen(AuthDb)) of > {ok, AuthDb2} -> > AuthDb2; > _ -> > > Modified: couchdb/trunk/src/couchdb/couch_db.erl > URL: > http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db.erl?rev=1036294&r1=1036293&r2=1036294&view=diff > ============================================================================== > --- couchdb/trunk/src/couchdb/couch_db.erl (original) > +++ couchdb/trunk/src/couchdb/couch_db.erl Thu Nov 18 00:11:18 2010 > @@ -27,6 +27,7 @@ > -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). > -export([changes_since/5,changes_since/6,read_doc/2,new_revid/1]). > -export([check_is_admin/1, check_is_reader/1]). > +-export([reopen/1]). > > -include("couch_db.hrl"). > > @@ -86,6 +87,18 @@ open(DbName, Options) -> > Else -> Else > end. > > +reopen(#db{main_pid = Pid, fd_ref_counter = OldRefCntr}) -> > + {ok, #db{fd_ref_counter = NewRefCntr} = NewDb} = > + gen_server:call(Pid, get_db, infinity), > + case NewRefCntr =:= OldRefCntr of > + true -> > + {ok, NewDb}; > + false -> > + couch_ref_counter:add(NewRefCntr), > + couch_ref_counter:drop(OldRefCntr), > + {ok, NewDb} > + end. > + > ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) -> > ok = gen_server:call(UpdatePid, full_commit, infinity), > {ok, StartTime}. > > Modified: couchdb/trunk/src/couchdb/couch_db_updater.erl > URL: > http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db_updater.erl?rev=1036294&r1=1036293&r2=1036294&view=diff > ============================================================================== > --- couchdb/trunk/src/couchdb/couch_db_updater.erl (original) > +++ couchdb/trunk/src/couchdb/couch_db_updater.erl Thu Nov 18 00:11:18 2010 > @@ -188,6 +188,7 @@ handle_cast({compact_done, CompactFilepa > close_db(Db), > NewDb3 = refresh_validate_doc_funs(NewDb2), > ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb3}, infinity), > + couch_db_update_notifier:notify({compacted, NewDb3#db.name}), > ?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]), > {noreply, NewDb3#db{compactor_pid=nil}}; > false -> > > Modified: couchdb/trunk/src/couchdb/couch_view_group.erl > URL: > http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_view_group.erl?rev=1036294&r1=1036293&r2=1036294&view=diff > ============================================================================== > --- couchdb/trunk/src/couchdb/couch_view_group.erl (original) > +++ couchdb/trunk/src/couchdb/couch_view_group.erl Thu Nov 18 00:11:18 2010 > @@ -32,7 +32,8 @@ > compactor_pid=nil, > waiting_commit=false, > waiting_list=[], > - ref_counter=nil > + ref_counter=nil, > + db_update_notifier=nil > }). > > % api methods > @@ -75,7 +76,7 @@ start_link(InitArgs) -> > end. > > % init creates a closure which spawns the appropriate view_updater. > -init({InitArgs, ReturnPid, Ref}) -> > +init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) -> > process_flag(trap_exit, true), > try prepare_group(InitArgs, false) of > {ok, #group{db=Db, fd=Fd, current_seq=Seq}=Group} -> > @@ -86,7 +87,15 @@ init({InitArgs, ReturnPid, Ref}) -> > _ -> > couch_db:monitor(Db), > {ok, RefCounter} = couch_ref_counter:start([Fd]), > + Server = self(), > + {ok, Notifier} = couch_db_update_notifier:start_link( > + fun({compacted, DbName1}) when DbName1 =:= DbName -> > + ok = gen_server:cast(Server, reopen_db); > + (_) -> > + ok > + end), > {ok, #group_state{ > + db_update_notifier=Notifier, > db_name=couch_db:name(Db), > init_args=InitArgs, > group=Group, > @@ -119,11 +128,11 @@ init({InitArgs, ReturnPid, Ref}) -> > handle_call({request_group, RequestSeq}, From, > #group_state{ > db_name=DbName, > - group=#group{current_seq=Seq}=Group, > + group=#group{current_seq=Seq, db=OldDb}=Group, > updater_pid=nil, > waiting_list=WaitList > }=State) when RequestSeq > Seq -> > - {ok, Db} = couch_db:open_int(DbName, []), > + {ok, Db} = reopen_db(DbName, OldDb), > Group2 = Group#group{db=Db}, > Owner = self(), > Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end), > @@ -158,11 +167,11 @@ handle_call(request_group_info, _From, S > handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil} > = State) -> > #group_state{ > - group = #group{name = GroupId, sig = GroupSig} = Group, > + group = #group{name = GroupId, sig = GroupSig, db = OldDb} = Group, > init_args = {RootDir, DbName, _} > } = State, > ?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]), > - {ok, Db} = couch_db:open_int(DbName, []), > + {ok, Db} = reopen_db(DbName, OldDb), > {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig), > NewGroup = reset_file(Db, Fd, DbName, Group), > Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end), > @@ -225,13 +234,14 @@ handle_cast({compact_done, NewGroup}, St > ?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " > ++ > "compact: ~p", [DbName, GroupId, CurrentSeq, > NewGroup#group.current_seq]), > couch_db:close(NewGroup#group.db), > - {ok, Db} = couch_db:open_int(DbName, []), > Pid = spawn_link(fun() -> > + {ok, Db} = couch_db:open_int(DbName, []), > {_,Ref} = erlang:spawn_monitor(fun() -> > couch_view_updater:update(nil, NewGroup#group{db = Db}) > end), > receive > {'DOWN', Ref, _, _, {new_group, NewGroup2}} -> > + couch_db:close(Db), > #group{name=GroupId} = NewGroup2, > Pid2 = couch_view:get_group_server(DbName, GroupId), > gen_server:cast(Pid2, {compact_done, NewGroup2}) > @@ -255,7 +265,11 @@ handle_cast({partial_update, Pid, NewGro > {noreply, State#group_state{group=NewGroup, waiting_commit=true}}; > handle_cast({partial_update, _, _}, State) -> > %% message from an old (probably pre-compaction) updater; ignore > - {noreply, State}. > + {noreply, State}; > + > +handle_cast(reopen_db, #group_state{group = Group, db_name = DbName} = > State) -> > + {ok, Db} = reopen_db(DbName, Group#group.db), > + {noreply, State#group_state{group = Group#group{db = Db}}}. > > handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> > {ok, Db} = couch_db:open_int(DbName, []), > @@ -341,6 +355,7 @@ handle_info({'DOWN',_,_,_,_}, State) -> > > > terminate(Reason, #group_state{updater_pid=Update, compactor_pid=Compact}=S) > -> > + couch_db_update_notifier:stop(S#group_state.db_update_notifier), > reply_all(S, Reason), > couch_util:shutdown_sync(Update), > couch_util:shutdown_sync(Compact), > @@ -372,8 +387,8 @@ reply_all(#group_state{waiting_list=Wait > [catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList], > State#group_state{waiting_list=[]}. > > -prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)-> > - case couch_db:open_int(DbName, []) of > +prepare_group({RootDir, DbName, #group{sig=Sig, db=OldDb}=Group}, > ForceReset)-> > + case reopen_db(DbName, OldDb) of > {ok, Db} -> > case open_index_file(RootDir, DbName, Sig) of > {ok, Fd} -> > @@ -634,4 +649,7 @@ init_group(Db, Fd, #group{def_lang=Lang, > Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, > id_btree=IdBtree, views=Views2}. > > - > +reopen_db(DbName, nil) -> > + couch_db:open_int(DbName, []); > +reopen_db(_DbName, Db) -> > + couch_db:reopen(Db). > >
