This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch prototype/views in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 5be667c4b1e424d031262404ac6f1bab1f5172ce Author: Paul J. Davis <[email protected]> AuthorDate: Thu Jul 18 12:29:14 2019 -0500 Move all fdb writer logic tou couch_views_fdb. --- src/couch_views/include/couch_views.hrl | 5 +- src/couch_views/src/couch_views_fdb.erl | 259 ++++++++++++++++---------------- 2 files changed, 133 insertions(+), 131 deletions(-) diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/include/couch_views.hrl index 99a62b0..4fcc57e 100644 --- a/src/couch_views/include/couch_views.hrl +++ b/src/couch_views/include/couch_views.hrl @@ -17,8 +17,9 @@ -define(VIEW_BUILDS, 4). -define(VIEW_STATUS, 5). -define(VIEW_WATCH, 6). --define(VIEW_ROW_KEY, 7). --define(VIEW_ROW_VALUE, 8). + +-define(VIEW_ROW_KEY, 0). +-define(VIEW_ROW_VALUE, 1). % jobs api -define(INDEX_JOB_TYPE, <<"views">>). diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl index 0791ffa..f47f1b1 100644 --- a/src/couch_views/src/couch_views_fdb.erl +++ b/src/couch_views/src/couch_views_fdb.erl @@ -14,20 +14,9 @@ -export([ get_update_seq/2, - update_view_seq/3, - get_seq_key/2, - - clear_id_index/4, - set_id_index/5, - get_id_index/4, - create_id_index_key/4, - - clear_map_index/5, - set_map_index_results/5, - get_map_index_key/4, - get_map_range_keys/3, - get_map_range/4, - unpack_map_row/3 + set_update_seq/3, + + write_rows/4 ]). @@ -42,167 +31,179 @@ % View Build Sequence Access % (<db>, ?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ) = Sequence -get_update_seq(Db, #mrst{sig = Sig}) -> + +get_update_seq(TxDb, #mrst{sig = Sig}) -> #{ + tx := Tx, db_prefix := DbPrefix - } = Db, + } = TxDb, - fabric2_fdb:transactional(Db, fun(TxDb) -> - Key = get_seq_key(Sig, DbPrefix), - Tx = maps:get(tx, TxDb), - case erlfdb:wait(erlfdb:get(Tx, Key)) of - not_found -> 0; - UpdateSeq -> UpdateSeq - end - end). + Key = get_seq_key(Sig, DbPrefix), + case erlfdb:wait(erlfdb:get(Tx, Key)) of + not_found -> <<>>; + UpdateSeq -> UpdateSeq + end. -update_view_seq(Db, Sig, Seq) -> - fabric2_fdb:transactional(Db, fun(TxDb) -> - #{ - db_prefix := DbPrefix, - tx := Tx - } = TxDb, - SeqKey = get_seq_key(Sig, DbPrefix), - erlfdb:set(Tx, SeqKey, Seq) - end). +set_view_seq(TxDb, Sig, Seq) -> + #{ + tx := Tx + db_prefix := DbPrefix, + } = TxDb, + SeqKey = get_seq_key(Sig, DbPrefix), + ok = erlfdb:set(Tx, SeqKey, Seq). -get_seq_key(Sig, DbPrefix) -> - erlfdb_tuple:pack({?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ}, DbPrefix). +write_doc(TxDb, Sig, #{deleted := true} = Doc, ViewIds) -> + #{ + id := DocId + } = Doc, + ViewKeys = get_view_keys(TxDb, Sig, DocId), -% Id Index access + clear_id_idx(TxDb, Sig, DocId), + lists:foreach(fun({ViewId, ViewKeys}) -> + clear_map_idx(TxDb, Sig, ViewId, ViewKeys) + end, ViewKeys). -% (<db>, ?VIEWS, <sig>, ?VIEW_ID_INDEX, <_id>, <view_id>) -> [emitted keys] -clear_id_index(TxDb, Sig, DocId, IdxName) -> +write_doc(TxDb, Sig, Doc, ViewIds) -> #{ - db_prefix := DbPrefix, - tx := Tx + db_prefix := DbPrefix } = TxDb, - IdKey = create_id_index_key(DbPrefix, Sig, DocId, IdxName), - ok = erlfdb:clear(Tx, IdKey). - -set_id_index(TxDb, Sig, IdxName, DocId, IdxKey) -> #{ - db_prefix := DbPrefix, - tx := Tx - } = TxDb, - IdKey = create_id_index_key(DbPrefix, Sig, DocId, IdxName), - erlfdb:set(Tx, IdKey, couch_views_encoding:encode(IdxKey)). + id := DocId, + results := Results + } = Doc, + + ExistingViewKeys = get_view_keys(TxDb, Sig, DocId), + + ok = clear_id_idx(TxDb, Sig, DocId), + + lists:foreach(fun({ViewId, NewRows}) -> + ExistingKeys = fabric2_util:get_value(ViewId, ExistingViewKeys, []), + update_id_idx(TxDb, Sig, ViewId, DocId, NewRows), + update_map_idx(TxDb, Sig, ViewId, DocId, ExitingKeys, NewRows) + end, lists:zip(ViewIds, Results)). -get_id_index(TxDb, Sig, Id, IdxName) -> +clear_id_idx(TxDb, Sig, DocId) -> #{ - db_prefix := DbPrefix, - tx := Tx + tx := Tx, + db_prefix := DbPrefix } = TxDb, - IdKey = create_id_index_key(DbPrefix, Sig, Id, IdxName), - case erlfdb:wait(erlfdb:get(Tx, IdKey)) of - not_found -> not_found; - IdxKey -> couch_views_encoding:decode(IdxKey) - end. + {Start, End} = id_idx_range(DbPrefix, Sig, DocId), + ok = erlfdb:clear_range(Start, End). -create_id_index_key(DbPrefix, Sig, DocId, IdxName) -> - BaseIdKey = {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId, IdxName}, - erlfdb_tuple:pack(BaseIdKey, DbPrefix). +clear_map_idx(TxDb, Sig, ViewId, ViewKeys) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = TxDb, -% Map Index Access -% {<db>, ?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, Idx, Key, DocId, -% RowType, Counter} = Values -% RowType = Emitted Keys or Emitted Value + lists:foreach(fun(ViewKey) -> + {Start, End} = map_idx_range(DbPrefix, Sig, ViewId, ViewKey, DocId), + ok = erlfdb:clear_range(Tx, Start, End) + end, ViewKeys). -clear_map_index(TxDb, Sig, IdxName, DocId, IdxKeys) when is_list(IdxKeys) -> - lists:foreach(fun (IdxKey) -> - clear_map_index(TxDb, Sig, IdxName, DocId, IdxKey) - end, IdxKeys); +update_id_idx(TxDb, Sig, ViewId, DocId, NewRows) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = TxDb, -clear_map_index(TxDb, Sig, IdxName, DocId, IdxKey) -> - #{db_prefix := DbPrefix, tx := Tx} = TxDb, - Key = couch_views_encoding:encode(IdxKey), - BaseKey = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, IdxName, Key, DocId}, - {StartKey, EndKey} = erlfdb_tuple:range(BaseKey, DbPrefix), - ok = erlfdb:clear_range(Tx, StartKey, EndKey). + Unique = lists:usort([K || {K, _V} <- NewRows]), + Key = id_idx_key(DbPrefix, Sig, ViewId, DocId), + Val = couch_views_encoding:encode(Unique), + ok = erlfdb:set(Tx, Key, Val). -set_map_index_results(TxDb, Sig, IdxName, DocId, Results) -> - #{db_prefix := DbPrefix, tx := Tx} = TxDb, - lists:foldl(fun ({IdxKey, IdxValue}, Counter) -> - RowKey = create_map_key(DbPrefix, Sig, IdxName, IdxKey, DocId, - ?VIEW_ROW_KEY, Counter), - RowValue = create_map_key(DbPrefix, Sig, IdxName, IdxKey, DocId, - ?VIEW_ROW_VALUE, Counter), - EncodedKey = pack_value(IdxKey), - EncodedValue = pack_value(IdxValue), +update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = TxDb, - ok = erlfdb:set(Tx, RowKey, EncodedKey), - ok = erlfdb:set(Tx, RowValue, EncodedValue), - Counter + 1 - end, 0, Results). + Unique = lists:usort([K || {K, _V} <- NewRows]), + KeysToRem = ExistingKeys -- Unique, + lists:foreach(fun(RemKey) -> + {Start, End} = map_idx_range(DbPrefix, Sig, ViewId, RemKey, DocId), + ok = erlfdb:clear_range(Tx, Start, End) + end, KeysToRem), -get_map_index_key(#{db_prefix := DbPrefix}, Sig, IdxName, Key) -> - EncKey = couch_views_encoding:encode(Key), - erlfdb_tuple:pack({?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, - IdxName, EncKey}, DbPrefix). + KVsToAdd = process_rows(NewRows), + MapIdxPrefix = map_idx_prefix(DbPrefix, Sig, ViewId), + lists:foreach(fun({DupeId, Key1, Key2, Val}) -> + KeyKey = map_idx_key(MapIdxPrefix, Key1, DocId, DupeId, ?VIEW_ROW_KEY), + ValKey = map_idx_key(MapIdxPrefix, Key1, DocId, DupeId, ?VIEW_ROW_VAL), + ok = erlfdn:store(Tx, KeyKey, Key2), + ok = erlfdb:store(Tx, ValKey, Val) + end, KVsToAdd). -get_map_range_keys(#{db_prefix := DbPrefix}, Sig, IdxName) -> - erlfdb_tuple:range({?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, IdxName}, DbPrefix). +get_view_keys(TxDb, Sig, DocId) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = TxDb, + {Start, End} = id_idx_range(DbPrefix, Sig, DocId) + lists:map(fun({K, V}) -> + {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId, ViewId} = + erlfdb_tuple:unpack(K, DbPrefix), + ViewKeys = couch_views_encoding:decode(V) + {ViewId, ViewKeys} + end, erlfdb:get_range(Tx, Start, End, [])). -get_map_range(TxDb, Start, End, Opts) -> - #{tx := Tx} = TxDb, - erlfdb:get_range(Tx, Start, End, Opts). +id_idx_key(DbPrefix, Sig, DocId, ViewId) -> + Key = {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId, ViewId}, + erlfdb_tuple:pack(Key, DbPrefix). -unpack_map_row(#{db_prefix := DbPrefix}, Key, Value) -> - case erlfdb_tuple:unpack(Key, DbPrefix) of - {?DB_VIEWS, _Sig, ?VIEW_MAP_RANGE, _Idx, _RowKey, Id, - ?VIEW_ROW_KEY, _Counter} -> - RowKey = unpack_value(Value), - {key, Id, RowKey}; - {?DB_VIEWS, _Sig, ?VIEW_MAP_RANGE, _Idx, _RowValue, Id, - ?VIEW_ROW_VALUE, _Counter} -> - RowValue = unpack_value(Value), - {value, Id, RowValue} - end. +id_idx_range(DbPrefix, Sig, DocId) -> + Key = {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId}, + erlfdb_tuple:range(Key, DbPrefix). -create_map_key(DbPrefix, Sig, IdxName, IdxKey, DocId, RowType, Counter) -> - Key = couch_views_encoding:encode(IdxKey), - BaseKey = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, - IdxName, Key, DocId, RowType, Counter}, - erlfdb_tuple:pack(BaseKey, DbPrefix). +map_idx_prefix(DbPrefix, Sig, ViewId) -> + Key = {?DB_VIES, Sig, ?VIEW_MAP_RANGE, ViewId}, + erlfdb_tuple:pack(Key). -% Internal used to packed and unpack Values +map_idx_key(MapIdxPrefix, MapKey, DocId, DupeId, Type) + Key = {MapKey, DocId, DupeId, Type}, + erldb_tuple:encode(Key, MapIdxPrefix). -pack_value(Val) when is_list(Val) -> - erlfdb_tuple:pack({?LIST_VALUE, list_to_tuple(Val)}); +map_idx_range(DbPrefix, Sig, ViewId, MapKey, DocId) -> + Encoded = couch_views_encoding:encode(MapKey, key), + Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, Encoded, DocId}, + erlfdb_tuple:range(Key, DbPrefix). -pack_value(Val) when is_tuple(Val) -> - {Props} = Val, - erlfdb_tuple:pack({?JSON_VALUE, list_to_tuple(Props)}); -pack_value(Val) -> - erlfdb_tuple:pack({?VALUE, Val}). +process_rows(Rows) -> + Encoded = lists:map(fun({K, V}) -> + EK1 = couch_views_encoding:encode(K, key), + EK2 = couch_views_encoding:encode(K, value), + EV = couch_views_encoding:encode(V, value), + {EKK, EKV, EV} + end, Rows), + Grouped = lists:foldl(fun({K1, K2, V}, Acc) -> + dict:append(K1, {K2, V}, Acc) + end, dict:new(), Encoded), -unpack_value(Bin) -> - case erlfdb_tuple:unpack(Bin) of - {?LIST_VALUE, Val} -> - tuple_to_list(Val); - {?JSON_VALUE, Val} -> - {tuple_to_list(Val)}; - {?VALUE, Val} -> - Val - end. + {_, Labeled} = dict:fold(fun(K1, Vals) -> + lists:foldl(fun({K2, V}, {Count, Acc}) -> + {Count + 1, [{Count, K1, K2, V} | Acc]} + end, {0, []}, Vals) + end, [], Grouped), + + Labeled.
