Author: fdmanana
Date: Wed Dec 8 11:02:44 2010
New Revision: 1043352
URL: http://svn.apache.org/viewvc?rev=1043352&view=rev
Log:
Add a dedicated couch_file server to the DB updater process.
This improves both read and write (specially the latest) performance when
there's a mix of read and write requests in parallel.
Closes COUCHDB-976.
Modified:
couchdb/trunk/src/couchdb/couch_btree.erl
couchdb/trunk/src/couchdb/couch_db.erl
couchdb/trunk/src/couchdb/couch_db.hrl
couchdb/trunk/src/couchdb/couch_db_updater.erl
couchdb/trunk/src/couchdb/couch_file.erl
Modified: couchdb/trunk/src/couchdb/couch_btree.erl
URL:
http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_btree.erl?rev=1043352&r1=1043351&r2=1043352&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_btree.erl (original)
+++ couchdb/trunk/src/couchdb/couch_btree.erl Wed Dec 8 11:02:44 2010
@@ -16,17 +16,9 @@
-export([fold/4, full_reduce/1, final_reduce/2, foldl/3, foldl/4]).
-export([fold_reduce/4, lookup/2, get_state/1, set_options/2]).
+-include("couch_db.hrl").
-define(CHUNK_THRESHOLD, 16#4ff).
--record(btree,
- {fd,
- root,
- extract_kv = fun({Key, Value}) -> {Key, Value} end,
- assemble_kv = fun(Key, Value) -> {Key, Value} end,
- less = fun(A, B) -> A < B end,
- reduce = nil
- }).
-
extract(#btree{extract_kv=Extract}, Value) ->
Extract(Value).
Modified: couchdb/trunk/src/couchdb/couch_db.erl
URL:
http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db.erl?rev=1043352&r1=1043351&r2=1043352&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_db.erl (original)
+++ couchdb/trunk/src/couchdb/couch_db.erl Wed Dec 8 11:02:44 2010
@@ -223,7 +223,7 @@ get_full_doc_info(Db, Id) ->
Result.
get_full_doc_infos(Db, Ids) ->
- couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids).
+ couch_btree:lookup(by_id_btree(Db), Ids).
increment_update_seq(#db{update_pid=UpdatePid}) ->
gen_server:call(UpdatePid, increment_update_seq).
@@ -251,11 +251,10 @@ get_db_info(Db) ->
compactor_pid=Compactor,
update_seq=SeqNum,
name=Name,
- fulldocinfo_by_id_btree=FullDocBtree,
instance_start_time=StartTime,
committed_update_seq=CommittedUpdateSeq} = Db,
{ok, Size} = couch_file:bytes(Fd),
- {ok, {Count, DelCount}} = couch_btree:full_reduce(FullDocBtree),
+ {ok, {Count, DelCount}} = couch_btree:full_reduce(by_id_btree(Db)),
InfoList = [
{db_name, Name},
{doc_count, Count},
@@ -270,8 +269,8 @@ get_db_info(Db) ->
],
{ok, InfoList}.
-get_design_docs(#db{fulldocinfo_by_id_btree=Btree}=Db) ->
- {ok,_, Docs} = couch_btree:fold(Btree,
+get_design_docs(Db) ->
+ {ok,_, Docs} = couch_btree:fold(by_id_btree(Db),
fun(#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds,
AccDocs) ->
{ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []),
{ok, [Doc | AccDocs]};
@@ -662,7 +661,7 @@ update_docs(Db, Docs, Options, replicate
DocErrors = [],
DocBuckets3 = DocBuckets
end,
- DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd)
+ DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.updater_fd)
|| Doc <- Bucket] || Bucket <- DocBuckets3],
{ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts |
Options]),
{ok, DocErrors};
@@ -718,7 +717,7 @@ update_docs(Db, Docs, Options, interacti
true -> [] end ++ Options,
DocBuckets3 = [[
doc_flush_atts(set_new_att_revpos(
- check_dup_atts(Doc)), Db#db.fd)
+ check_dup_atts(Doc)), Db#db.updater_fd)
|| Doc <- B] || B <- DocBuckets2],
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
@@ -786,7 +785,10 @@ write_and_commit(#db{update_pid=UpdatePi
% This can happen if the db file we wrote to was swapped out by
% compaction. Retry by reopening the db and writing to the current
file
{ok, Db2} = open_ref_counted(Db#db.main_pid, self()),
- DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket]
|| Bucket <- DocBuckets],
+ DocBuckets2 = [
+ [doc_flush_atts(Doc, Db2#db.updater_fd) || Doc <- Bucket] ||
+ Bucket <- DocBuckets
+ ],
% We only retry once
close(Db2),
UpdatePid ! {update_docs, self(), DocBuckets2, NonRepDocs,
MergeConflicts, FullCommit},
@@ -954,25 +956,28 @@ changes_since(Db, Style, StartSeq, Fun,
end,
Fun(DocInfo2, Acc2)
end,
- {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree,
+ {ok, _LastReduction, AccOut} = couch_btree:fold(by_seq_btree(Db),
Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options),
{ok, AccOut}.
count_changes_since(Db, SinceSeq) ->
+ BTree = by_seq_btree(Db),
{ok, Changes} =
- couch_btree:fold_reduce(Db#db.docinfo_by_seq_btree,
+ couch_btree:fold_reduce(BTree,
fun(_SeqStart, PartialReds, 0) ->
- {ok, couch_btree:final_reduce(Db#db.docinfo_by_seq_btree,
PartialReds)}
+ {ok, couch_btree:final_reduce(BTree, PartialReds)}
end,
0, [{start_key, SinceSeq + 1}]),
Changes.
enum_docs_since(Db, SinceSeq, InFun, Acc, Options) ->
- {ok, LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree,
InFun, Acc, [{start_key, SinceSeq + 1} | Options]),
+ {ok, LastReduction, AccOut} = couch_btree:fold(
+ by_seq_btree(Db), InFun, Acc, [{start_key, SinceSeq + 1} | Options]),
{ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}.
enum_docs(Db, InFun, InAcc, Options) ->
- {ok, LastReduce, OutAcc} = couch_btree:fold(Db#db.fulldocinfo_by_id_btree,
InFun, InAcc, Options),
+ {ok, LastReduce, OutAcc} = couch_btree:fold(
+ by_id_btree(Db), InFun, InAcc, Options),
{ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
% server functions
@@ -1072,7 +1077,7 @@ open_doc_revs_int(Db, IdRevs, Options) -
IdRevs, LookupResults).
open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, _Options) ->
- case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of
+ case couch_btree:lookup(local_btree(Db), [Id]) of
[{ok, {_, {Rev, BodyData}}}] ->
{ok, #doc{id=Id, revs={0, [list_to_binary(integer_to_list(Rev))]},
body=BodyData}};
[not_found] ->
@@ -1151,7 +1156,7 @@ doc_to_tree_simple(Doc, [RevId | Rest])
[{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}].
-make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
+make_doc(#db{updater_fd = Fd} = Db, Id, Deleted, Bp, RevisionPath) ->
{BodyData, Atts} =
case Bp of
nil ->
@@ -1207,7 +1212,19 @@ make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp,
}.
-increment_stat(#db{is_sys_db = true}, _Stat) ->
- ok;
-increment_stat(#db{}, Stat) ->
- couch_stats_collector:increment(Stat).
+increment_stat(#db{options = Options}, Stat) ->
+ case lists:member(sys_db, Options) of
+ true ->
+ ok;
+ false ->
+ couch_stats_collector:increment(Stat)
+ end.
+
+local_btree(#db{local_docs_btree = BTree, fd = ReaderFd}) ->
+ BTree#btree{fd = ReaderFd}.
+
+by_seq_btree(#db{docinfo_by_seq_btree = BTree, fd = ReaderFd}) ->
+ BTree#btree{fd = ReaderFd}.
+
+by_id_btree(#db{fulldocinfo_by_id_btree = BTree, fd = ReaderFd}) ->
+ BTree#btree{fd = ReaderFd}.
Modified: couchdb/trunk/src/couchdb/couch_db.hrl
URL:
http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db.hrl?rev=1043352&r1=1043351&r2=1043352&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_db.hrl (original)
+++ couchdb/trunk/src/couchdb/couch_db.hrl Wed Dec 8 11:02:44 2010
@@ -158,6 +158,7 @@
compactor_pid = nil,
instance_start_time, % number of microsecs since jan 1 1970 as a binary
string
fd,
+ updater_fd,
fd_ref_counter,
header = #db_header{},
committed_update_seq,
@@ -174,7 +175,7 @@
waiting_delayed_commit = nil,
revs_limit = 1000,
fsync_options = [],
- is_sys_db = false
+ options = []
}).
@@ -297,3 +298,11 @@
db_open_options = []
}).
+-record(btree, {
+ fd,
+ root,
+ extract_kv = fun({_Key, _Value} = KV) -> KV end,
+ assemble_kv = fun(Key, Value) -> {Key, Value} end,
+ less = fun(A, B) -> A < B end,
+ reduce = nil
+}).
Modified: couchdb/trunk/src/couchdb/couch_db_updater.erl
URL:
http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db_updater.erl?rev=1043352&r1=1043351&r2=1043352&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_db_updater.erl (original)
+++ couchdb/trunk/src/couchdb/couch_db_updater.erl Wed Dec 8 11:02:44 2010
@@ -42,13 +42,14 @@ init({MainPid, DbName, Filepath, Fd, Opt
file:delete(Filepath ++ ".compact")
end
end,
-
- Db = init_db(DbName, Filepath, Fd, Header),
+ ReaderFd = open_reader_fd(Filepath, Options),
+ Db = init_db(DbName, Filepath, Fd, ReaderFd, Header, Options),
Db2 = refresh_validate_doc_funs(Db),
- {ok, Db2#db{main_pid = MainPid, is_sys_db = lists:member(sys_db,
Options)}}.
+ {ok, Db2#db{main_pid = MainPid}}.
terminate(_Reason, Db) ->
+ couch_file:close(Db#db.updater_fd),
couch_file:close(Db#db.fd),
couch_util:shutdown_sync(Db#db.compactor_pid),
couch_util:shutdown_sync(Db#db.fd_ref_counter),
@@ -67,7 +68,7 @@ handle_call(increment_update_seq, _From,
{reply, {ok, Db2#db.update_seq}, Db2};
handle_call({set_security, NewSec}, _From, Db) ->
- {ok, Ptr} = couch_file:append_term(Db#db.fd, NewSec),
+ {ok, Ptr} = couch_file:append_term(Db#db.updater_fd, NewSec),
Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
update_seq=Db#db.update_seq+1}),
ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
@@ -84,7 +85,7 @@ handle_call({purge_docs, _IdRevs}, _From
{reply, {error, purge_during_compaction}, Db};
handle_call({purge_docs, IdRevs}, _From, Db) ->
#db{
- fd=Fd,
+ updater_fd = Fd,
fulldocinfo_by_id_btree = DocInfoByIdBTree,
docinfo_by_seq_btree = DocInfoBySeqBTree,
update_seq = LastSeq,
@@ -161,9 +162,10 @@ handle_call(start_compact, _From, Db) ->
handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
{ok, NewFd} = couch_file:open(CompactFilepath),
+ ReaderFd = open_reader_fd(CompactFilepath, Db#db.options),
{ok, NewHeader} = couch_file:read_header(NewFd),
#db{update_seq=NewSeq} = NewDb =
- init_db(Db#db.name, Filepath, NewFd, NewHeader),
+ init_db(Db#db.name, Filepath, NewFd, ReaderFd, NewHeader,
Db#db.options),
unlink(NewFd),
case Db#db.update_seq == NewSeq of
true ->
@@ -362,7 +364,7 @@ simple_upgrade_record(Old, New) when tup
list_to_tuple(tuple_to_list(Old) ++ NewValuesTail).
-init_db(DbName, Filepath, Fd, Header0) ->
+init_db(DbName, Filepath, Fd, ReaderFd, Header0, Options) ->
Header1 = simple_upgrade_record(Header0, #db_header{}),
Header =
case element(2, Header1) of
@@ -403,10 +405,11 @@ init_db(DbName, Filepath, Fd, Header0) -
{MegaSecs, Secs, MicroSecs} = now(),
StartTime = ?l2b(io_lib:format("~p",
[(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
- {ok, RefCntr} = couch_ref_counter:start([Fd]),
+ {ok, RefCntr} = couch_ref_counter:start([Fd, ReaderFd]),
#db{
update_pid=self(),
- fd=Fd,
+ fd = ReaderFd,
+ updater_fd = Fd,
fd_ref_counter = RefCntr,
header=Header,
fulldocinfo_by_id_btree = IdBtree,
@@ -420,9 +423,19 @@ init_db(DbName, Filepath, Fd, Header0) -
security_ptr = SecurityPtr,
instance_start_time = StartTime,
revs_limit = Header#db_header.revs_limit,
- fsync_options = FsyncOptions
+ fsync_options = FsyncOptions,
+ options = Options
}.
+open_reader_fd(Filepath, Options) ->
+ {ok, Fd} = case lists:member(sys_db, Options) of
+ true ->
+ couch_file:open(Filepath, [read_only, sys_db]);
+ false ->
+ couch_file:open(Filepath, [read_only])
+ end,
+ unlink(Fd),
+ Fd.
close_db(#db{fd_ref_counter = RefCntr}) ->
couch_ref_counter:drop(RefCntr).
@@ -443,7 +456,7 @@ refresh_validate_doc_funs(Db) ->
flush_trees(_Db, [], AccFlushedTrees) ->
{ok, lists:reverse(AccFlushedTrees)};
-flush_trees(#db{fd=Fd,header=Header}=Db,
+flush_trees(#db{updater_fd = Fd, header = Header} = Db,
[InfoUnflushed | RestUnflushed], AccFlushed) ->
#full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
Flushed = couch_key_tree:map(
@@ -694,7 +707,7 @@ commit_data(Db, true) ->
Db;
commit_data(Db, _) ->
#db{
- fd = Fd,
+ updater_fd = Fd,
filepath = Filepath,
header = OldHeader,
fsync_options = FsyncOptions,
@@ -723,7 +736,8 @@ commit_data(Db, _) ->
end.
-copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) ->
+copy_doc_attachments(#db{updater_fd = SrcFd} = SrcDb, {Pos,_RevId},
+ SrcSp, DestFd) ->
{ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcDb, SrcSp),
% copy the bin values
NewBinInfos = lists:map(
@@ -775,7 +789,7 @@ copy_rev_tree_attachments(SrcDb, DestFd,
end, Tree).
-copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
+copy_docs(Db, #db{updater_fd = DestFd} = NewDb, InfoBySeq, Retry) ->
Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
@@ -851,7 +865,7 @@ copy_compact(Db, NewDb0, Retry) ->
% copy misc header values
if NewDb3#db.security /= Db#db.security ->
- {ok, Ptr} = couch_file:append_term(NewDb3#db.fd, Db#db.security),
+ {ok, Ptr} = couch_file:append_term(NewDb3#db.updater_fd,
Db#db.security),
NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
true ->
NewDb4 = NewDb3
@@ -873,7 +887,8 @@ start_copy_compact(#db{name=Name,filepat
Retry = false,
ok = couch_file:write_header(Fd, Header=#db_header{})
end,
- NewDb = init_db(Name, CompactFile, Fd, Header),
+ ReaderFd = open_reader_fd(CompactFile, Db#db.options),
+ NewDb = init_db(Name, CompactFile, Fd, ReaderFd, Header, Db#db.options),
unlink(Fd),
NewDb2 = copy_compact(Db, NewDb, Retry),
close_db(NewDb2),
Modified: couchdb/trunk/src/couchdb/couch_file.erl
URL:
http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_file.erl?rev=1043352&r1=1043351&r2=1043352&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_file.erl (original)
+++ couchdb/trunk/src/couchdb/couch_file.erl Wed Dec 8 11:02:44 2010
@@ -19,8 +19,7 @@
-record(file, {
fd,
- tail_append_begin = 0, % 09 UPGRADE CODE
- eof = 0
+ tail_append_begin = 0 % 09 UPGRADE CODE
}).
-export([open/1, open/2, close/1, bytes/1, sync/1,
append_binary/2,old_pread/3]).
@@ -227,10 +226,11 @@ init_status_error(ReturnPid, Ref, Error)
init({Filepath, Options, ReturnPid, Ref}) ->
process_flag(trap_exit, true),
+ OpenOptions = file_open_options(Options),
case lists:member(create, Options) of
true ->
filelib:ensure_dir(Filepath),
- case file:open(Filepath, [read, append, raw, binary]) of
+ case file:open(Filepath, OpenOptions) of
{ok, Fd} ->
{ok, Length} = file:position(Fd, eof),
case Length > 0 of
@@ -260,16 +260,23 @@ init({Filepath, Options, ReturnPid, Ref}
% open in read mode first, so we don't create the file if it doesn't
exist.
case file:open(Filepath, [read, raw]) of
{ok, Fd_Read} ->
- {ok, Fd} = file:open(Filepath, [read, append, raw, binary]),
+ {ok, Fd} = file:open(Filepath, OpenOptions),
ok = file:close(Fd_Read),
maybe_track_open_os_files(Options),
- {ok, Length} = file:position(Fd, eof),
- {ok, #file{fd=Fd, eof=Length}};
+ {ok, #file{fd=Fd}};
Error ->
init_status_error(ReturnPid, Ref, Error)
end
end.
+file_open_options(Options) ->
+ [read, raw, binary] ++ case lists:member(read_only, Options) of
+ true ->
+ [];
+ false ->
+ [append]
+ end.
+
maybe_track_open_os_files(FileOptions) ->
case lists:member(sys_db, FileOptions) of
true ->
@@ -309,28 +316,25 @@ handle_call({pread_iolist, Pos}, _From,
handle_call({pread, Pos, Bytes}, _From,
#file{fd=Fd,tail_append_begin=TailAppendBegin}=File) ->
{ok, Bin} = file:pread(Fd, Pos, Bytes),
{reply, {ok, Bin, Pos >= TailAppendBegin}, File};
-handle_call(bytes, _From, #file{eof=Length}=File) ->
- {reply, {ok, Length}, File};
+handle_call(bytes, _From, #file{fd = Fd} = File) ->
+ {reply, file:position(Fd, eof), File};
handle_call(sync, _From, #file{fd=Fd}=File) ->
{reply, file:sync(Fd), File};
handle_call({truncate, Pos}, _From, #file{fd=Fd}=File) ->
{ok, Pos} = file:position(Fd, Pos),
- case file:truncate(Fd) of
- ok ->
- {reply, ok, File#file{eof=Pos}};
- Error ->
- {reply, Error, File}
- end;
-handle_call({append_bin, Bin}, _From, #file{fd=Fd, eof=Pos}=File) ->
+ {reply, file:truncate(Fd), File};
+handle_call({append_bin, Bin}, _From, #file{fd = Fd} = File) ->
+ {ok, Pos} = file:position(Fd, eof),
Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin),
case file:write(Fd, Blocks) of
ok ->
- {reply, {ok, Pos}, File#file{eof=Pos+iolist_size(Blocks)}};
+ {reply, {ok, Pos}, File};
Error ->
{reply, Error, File}
end;
-handle_call({write_header, Bin}, _From, #file{fd=Fd, eof=Pos}=File) ->
- BinSize = size(Bin),
+handle_call({write_header, Bin}, _From, #file{fd = Fd} = File) ->
+ {ok, Pos} = file:position(Fd, eof),
+ BinSize = byte_size(Bin),
case Pos rem ?SIZE_BLOCK of
0 ->
Padding = <<>>;
@@ -338,18 +342,12 @@ handle_call({write_header, Bin}, _From,
Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>>
end,
FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(5, [Bin])],
- case file:write(Fd, FinalBin) of
- ok ->
- {reply, ok, File#file{eof=Pos+iolist_size(FinalBin)}};
- Error ->
- {reply, Error, File}
- end;
-
+ {reply, file:write(Fd, FinalBin), File};
handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) ->
case (catch read_old_header(Fd, Prefix)) of
{ok, Header} ->
- TailAppendBegin = File#file.eof,
+ {ok, TailAppendBegin} = file:position(Fd, eof),
Bin = term_to_binary(Header),
Md5 = couch_util:md5(Bin),
% now we assemble the final header binary and write to disk
@@ -367,7 +365,8 @@ handle_call({upgrade_old_header, Prefix}
end;
-handle_call(find_header, _From, #file{fd=Fd, eof=Pos}=File) ->
+handle_call(find_header, _From, #file{fd = Fd} = File) ->
+ {ok, Pos} = file:position(Fd, eof),
{reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
% 09 UPGRADE CODE