This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch prototype/views in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit ed46cb750969d0c42f0fa91de7d68cc08ba013bd Author: Garren Smith <[email protected]> AuthorDate: Mon Jun 17 15:45:10 2019 +0200 CouchDB map indexes on FDB This adds couch_views which builds map indexes and stores them in FDB. --- rebar.config.script | 1 + rel/overlay/etc/default.ini | 6 + rel/reltool.config | 2 + src/chttpd/src/chttpd_db.erl | 3 +- src/chttpd/src/chttpd_view.erl | 2 +- src/couch_mrview/src/couch_mrview_util.erl | 2 +- src/couch_views/.gitignore | 19 + src/couch_views/README.md | 16 + src/couch_views/include/couch_views.hrl | 94 ++++ src/couch_views/rebar.config | 14 + src/couch_views/src/couch_views.app.src | 31 ++ src/couch_views/src/couch_views.erl | 115 +++++ src/couch_views/src/couch_views_app.erl | 31 ++ src/couch_views/src/couch_views_encoding.erl | 108 +++++ src/couch_views/src/couch_views_fdb.erl | 208 +++++++++ src/couch_views/src/couch_views_indexer.erl | 262 +++++++++++ src/couch_views/src/couch_views_jobs.erl | 122 ++++++ src/couch_views/src/couch_views_reader.erl | 204 +++++++++ src/couch_views/src/couch_views_sup.erl | 46 ++ src/couch_views/src/couch_views_util.erl | 83 ++++ src/couch_views/src/couch_views_worker.erl | 44 ++ src/couch_views/src/couch_views_worker_server.erl | 110 +++++ src/couch_views/test/couch_views_encoding_test.erl | 73 ++++ src/couch_views/test/couch_views_indexer_test.erl | 258 +++++++++++ src/couch_views/test/couch_views_map_test.erl | 484 +++++++++++++++++++++ src/fabric/src/fabric2.hrl | 1 + src/fabric/src/fabric2_view.erl | 81 ++++ test/elixir/test/map_test.exs | 222 ++++++++++ 28 files changed, 2639 insertions(+), 3 deletions(-) diff --git a/rebar.config.script b/rebar.config.script index 116c040..ce79728 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -82,6 +82,7 @@ SubDirs = [ "src/couch_stats", "src/couch_peruser", "src/couch_tests", + "src/couch_views", "src/ddoc_cache", "src/fabric", "src/couch_jobs", diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 8fd2261..59b7d57 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -223,6 +223,12 @@ iterations = 10 ; iterations for password hashing ; users_db_public = false ; cookie_domain = example.com +; Settings for view indexing +[couch_views] +; type_check_period_msec = 500 +; type_check_max_jitter_msec = 500 +; change_limit = 100 + ; CSP (Content Security Policy) Support for _utils [csp] enable = true diff --git a/rel/reltool.config b/rel/reltool.config index 7b2159d..2b088be 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -42,6 +42,7 @@ couch_stats, couch_event, couch_peruser, + couch_views, ddoc_cache, ets_lru, fabric, @@ -100,6 +101,7 @@ {app, couch_stats, [{incl_cond, include}]}, {app, couch_event, [{incl_cond, include}]}, {app, couch_peruser, [{incl_cond, include}]}, + {app, couch_views, [{incl_cond, include}]}, {app, ddoc_cache, [{incl_cond, include}]}, {app, ets_lru, [{incl_cond, include}]}, {app, fabric, [{incl_cond, include}]}, diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index 0c7e4d5..785ca3f 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -334,7 +334,8 @@ handle_design_req(#httpd{ path_parts=[_DbName, _Design, Name, <<"_",_/binary>> = Action | _Rest] }=Req, Db) -> DbName = fabric2_db:name(Db), - case ddoc_cache:open(DbName, <<"_design/", Name/binary>>) of +%% case ddoc_cache:open(DbName, <<"_design/", Name/binary>>) of + case fabric2_db:open_doc(Db, <<"_design/", Name/binary>>) of {ok, DDoc} -> Handler = chttpd_handlers:design_handler(Action, fun bad_action_req/3), Handler(Req, Db, DDoc); diff --git a/src/chttpd/src/chttpd_view.erl b/src/chttpd/src/chttpd_view.erl index 26107d7..849d870 100644 --- a/src/chttpd/src/chttpd_view.erl +++ b/src/chttpd/src/chttpd_view.erl @@ -45,7 +45,7 @@ design_doc_view(Req, Db, DDoc, ViewName, Keys) -> Max = chttpd:chunked_response_buffer_size(), VAcc = #vacc{db=Db, req=Req, threshold=Max}, Options = [{user_ctx, Req#httpd.user_ctx}], - {ok, Resp} = fabric:query_view(Db, Options, DDoc, ViewName, + {ok, Resp} = fabric2_view:query(Db, Options, DDoc, ViewName, fun view_cb/2, VAcc, Args), {ok, Resp#vacc.resp}. diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl index eb68124..18a4be1 100644 --- a/src/couch_mrview/src/couch_mrview_util.erl +++ b/src/couch_mrview/src/couch_mrview_util.erl @@ -497,7 +497,7 @@ fold_reduce({NthRed, Lang, View}, Fun, Acc, Options) -> validate_args(Db, DDoc, Args0) -> - {ok, State} = couch_mrview_index:init(Db, DDoc), + {ok, State} = couch_mrview_util:ddoc_to_mrst(fabric2_db:name(Db), DDoc), Args1 = apply_limit(State#mrst.partitioned, Args0), validate_args(State, Args1). diff --git a/src/couch_views/.gitignore b/src/couch_views/.gitignore new file mode 100644 index 0000000..f1c4554 --- /dev/null +++ b/src/couch_views/.gitignore @@ -0,0 +1,19 @@ +.rebar3 +_* +.eunit +*.o +*.beam +*.plt +*.swp +*.swo +.erlang.cookie +ebin +log +erl_crash.dump +.rebar +logs +_build +.idea +*.iml +rebar3.crashdump +*~ diff --git a/src/couch_views/README.md b/src/couch_views/README.md new file mode 100644 index 0000000..dba0fcf2 --- /dev/null +++ b/src/couch_views/README.md @@ -0,0 +1,16 @@ +CouchDB Views +===== + +This is the new application that builds and runs Map/reduce views against FoundationDB. +Currently only map indexes are supported and it will always return the full index. + +Code layout: + +* `couch_views` - Main entry point to query a view +* `couch_views_reader` - Reads from the index. +* `couch_views_indexer` - Queries the changes feed from the last sequence and updates the index +* `couch_views_fdb` - a wrapper around erlfdb +* `couch_views_encoding` - Emitted key encoding to keep CouchDB sorting rules +* `couch_views_worker_server` - checks for indexing jobs and spawns a worker to build it +* `couch_views_worker` - runs couch_views_indexer and builds index along with sending updates back to jobs +* `couch_views_jobs` - a wrapper around couch_jobs diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/include/couch_views.hrl new file mode 100644 index 0000000..f5a9c8b --- /dev/null +++ b/src/couch_views/include/couch_views.hrl @@ -0,0 +1,94 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% indexing +-define(VIEW_UPDATE_SEQ, 1). +-define(VIEW_ID_RANGE, 2). +-define(VIEW_MAP_RANGE, 3). +-define(VIEW_BUILDS, 4). +-define(VIEW_STATUS, 5). +-define(VIEW_WATCH, 6). +-define(VIEW_ROW_KEY, 7). +-define(VIEW_ROW_VALUE, 8). + +% jobs api +-define(INDEX_JOB_TYPE, <<"views">>). + + +-record(mrst, { + sig=nil, + fd=nil, + db_name, + idx_name, + language, + design_opts=[], + seq_indexed=false, + keyseq_indexed=false, + partitioned=false, + lib, + views, + % update_seq=0, + % purge_seq=0, + % first_build, + % partial_resp_pid, + % doc_acc, + % doc_queue, + % write_queue, + qserver=nil +}). + + +-record(mrview, { + id_num, + % update_seq=0, + % purge_seq=0, + map_names=[], + reduce_funs=[], + def, + seq_indexed=false, + keyseq_indexed=false, + options=[] +}). + + +-define(MAX_VIEW_LIMIT, 16#10000000). + + +-record(mrargs, { + view_type, + % reduce, + + % preflight_fun, + + start_key, + start_key_docid, + end_key, + end_key_docid, + keys, + + direction = fwd, + limit = ?MAX_VIEW_LIMIT, + skip = 0, + % group_level = 0, + % group = undefined, + stable = false, + update = true, + multi_get = false, + inclusive_end = true, + include_docs = false, + doc_options = [], + update_seq=false, + conflicts, + % callback, + sorted = true + % extra = [] +}). diff --git a/src/couch_views/rebar.config b/src/couch_views/rebar.config new file mode 100644 index 0000000..362c878 --- /dev/null +++ b/src/couch_views/rebar.config @@ -0,0 +1,14 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +{cover_enabled, true}. +{cover_print_enabled, true}. diff --git a/src/couch_views/src/couch_views.app.src b/src/couch_views/src/couch_views.app.src new file mode 100644 index 0000000..9e1bbe7 --- /dev/null +++ b/src/couch_views/src/couch_views.app.src @@ -0,0 +1,31 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +{application, couch_views, + [{description, "CouchDB Views on FDB"}, + {vsn, git}, + {mod, {couch_views_app, []}}, + {registered, [ + couch_views_sup, + couch_views_worker_server + ]}, + {applications, [ + kernel, + stdlib, + erlfdb, + couch_log, + config, + couch_stats, + fabric, + couch_jobs + ]} + ]}. diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl new file mode 100644 index 0000000..4ccf0fa --- /dev/null +++ b/src/couch_views/src/couch_views.erl @@ -0,0 +1,115 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_views). + +-export([ + map_query/6 +]). + +-include("couch_views.hrl"). + + +map_query(Db, DDoc, ViewName, Callback, Acc0, Args0) -> + Args = process_args(Args0), + #{name := DbName} = Db, + {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc), + maybe_build_index(Db, Mrst, Args), + Resp = couch_views_reader:read(Db, DDoc, ViewName, Callback, Acc0, Args), + + UpdateAfter = maps:get(update, Args) == lazy, + if UpdateAfter == false -> ok; true -> + maybe_add_couch_job(Db, Mrst) + end, + Resp. + + +process_args(#{} = Args) -> + Args1 = maps:filter(fun (_, V) -> V /= undefined end, Args), + + maps:merge(#{ + direction => fwd, + inclusive_end => true, + update => true, + skip => 0, + limit => ?MAX_VIEW_LIMIT + }, Args1). + + +maybe_build_index(_Db, _Mrst, #{update := false}) -> + false; + +maybe_build_index(_Db, _Mrst, #{update := lazy}) -> + false; + +maybe_build_index(Db, Mrst, _Args) -> + {Status, Seq} = fabric2_fdb:transactional(Db, fun(TxDb) -> + case view_up_to_date(TxDb, Mrst) of + {true, UpdateSeq} -> + {ready, UpdateSeq}; + {false, LatestSeq} -> + maybe_add_couch_job(TxDb, Mrst), + {false, LatestSeq} + end + end), + + if Status == ready -> true; true -> + subscribe_and_wait_for_index(Db, Mrst, Seq) + end. + + +view_up_to_date(Db, Mrst) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + UpdateSeq = couch_views_fdb:get_update_seq(TxDb, Mrst), + LastChange = fabric2_fdb:get_last_change(TxDb), + {UpdateSeq == LastChange, LastChange} + end). + + +maybe_add_couch_job(TxDb, Mrst) -> + case couch_views_jobs:status(TxDb, Mrst) of + running -> + ok; + pending -> + ok; + Status when Status == finished orelse Status == not_found -> + couch_views_jobs:add(TxDb, Mrst) + end. + + +subscribe_and_wait_for_index(Db, Mrst, Seq) -> + case couch_views_jobs:subscribe(Db, Mrst) of + {error, Error} -> + throw({error, Error}); + {ok, finished, _} -> + ready; + {ok, Subscription, _JobState, _} -> + wait_for_index_ready(Subscription, Db, Mrst, Seq) + end. + + +wait_for_index_ready(Subscription, Db, Mrst, Seq) -> + Out = couch_views_jobs:wait(Subscription), + case Out of + {finished, _JobData} -> + ready; + {pending, _JobData} -> + wait_for_index_ready(Subscription, Db, Mrst, Seq); + {running, #{last_seq := LastSeq}} -> + if LastSeq =< Seq -> ready; true -> + wait_for_index_ready(Subscription, Db, Mrst, Seq) + end; + {running, _JobData} -> + wait_for_index_ready(Subscription, Db, Mrst, Seq); + {error, Error} -> + throw({error, Error}) + end. diff --git a/src/couch_views/src/couch_views_app.erl b/src/couch_views/src/couch_views_app.erl new file mode 100644 index 0000000..5ede5ef --- /dev/null +++ b/src/couch_views/src/couch_views_app.erl @@ -0,0 +1,31 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +-module(couch_views_app). + + +-behaviour(application). + + +-export([ + start/2, + stop/1 +]). + + +start(_StartType, StartArgs) -> + couch_views_sup:start_link(StartArgs). + + +stop(_State) -> + ok. diff --git a/src/couch_views/src/couch_views_encoding.erl b/src/couch_views/src/couch_views_encoding.erl new file mode 100644 index 0000000..3af6d7f --- /dev/null +++ b/src/couch_views/src/couch_views_encoding.erl @@ -0,0 +1,108 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_views_encoding). + + +-export([ + encode/1, + decode/1 +]). + + +-define(NULL, 16#00). +-define(FALSE, 16#26). +-define(TRUE, 16#27). +-define(NUMBER, 16#40). +-define(STRING, 16#41). +-define(LIST, 16#42). +-define(OBJECT, 16#43). + + +encode(X) -> + Encoded = encode_int(X), + erlfdb_tuple:pack(Encoded). + + +decode(EncodedVal) -> + Val = erlfdb_tuple:unpack(EncodedVal), + decode_int(Val). + + +encode_int(X) when is_atom(X) -> encode_atom(X); +encode_int(X) when is_number(X) -> encode_number(X); +encode_int(X) when is_binary(X) -> encode_binary(X); +encode_int(X) when is_list(X) -> encode_list(X); +encode_int(X) when is_tuple(X) -> encode_object(X). + + +encode_atom(null) -> + {?NULL}; + +encode_atom(false) -> + {?FALSE}; + +encode_atom(true) -> + {?TRUE}. + + +encode_number(Val) -> + {?NUMBER, float(Val)}. + + +encode_binary(Val) -> + % TODO add sort strings + {?STRING, Val}. + + +encode_list(List) -> + EncodedItems = lists:map(fun encode_int/1, List), + {?LIST, list_to_tuple(EncodedItems)}. + + +encode_object({Props}) -> + EncodedProps = lists:map(fun({K, V}) -> + EncodedK = encode_int(K), + EncodedV = encode_int(V), + {EncodedK, EncodedV} + end, Props), + {?OBJECT, list_to_tuple(EncodedProps)}. + + +decode_int({?NULL}) -> + null; + +decode_int({?FALSE}) -> + false; + +decode_int({?TRUE}) -> + true; + +decode_int({?STRING, String}) -> + String; + +decode_int({?NUMBER, Number}) -> + case Number - trunc(Number) of + 0 -> trunc(Number); % convert to integer + _ -> Number + end; + +decode_int({?LIST, List}) -> + lists:map(fun decode_int/1, tuple_to_list(List)); + +decode_int({?OBJECT, Object}) -> + Props = lists:map(fun({EncodedK, EncodedV}) -> + K = decode_int(EncodedK), + V = decode_int(EncodedV), + {K, V} + end, tuple_to_list(Object)), + {Props}. diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl new file mode 100644 index 0000000..0791ffa --- /dev/null +++ b/src/couch_views/src/couch_views_fdb.erl @@ -0,0 +1,208 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_views_fdb). + +-export([ + get_update_seq/2, + update_view_seq/3, + get_seq_key/2, + + clear_id_index/4, + set_id_index/5, + get_id_index/4, + create_id_index_key/4, + + clear_map_index/5, + set_map_index_results/5, + get_map_index_key/4, + get_map_range_keys/3, + get_map_range/4, + unpack_map_row/3 +]). + + +-define(LIST_VALUE, 0). +-define(JSON_VALUE, 1). +-define(VALUE, 2). + + +-include_lib("fabric/src/fabric2.hrl"). +-include("couch_views.hrl"). + +% View Build Sequence Access +% (<db>, ?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ) = Sequence + +get_update_seq(Db, #mrst{sig = Sig}) -> + #{ + db_prefix := DbPrefix + } = Db, + + fabric2_fdb:transactional(Db, fun(TxDb) -> + Key = get_seq_key(Sig, DbPrefix), + Tx = maps:get(tx, TxDb), + case erlfdb:wait(erlfdb:get(Tx, Key)) of + not_found -> 0; + UpdateSeq -> UpdateSeq + end + end). + + +update_view_seq(Db, Sig, Seq) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + #{ + db_prefix := DbPrefix, + tx := Tx + } = TxDb, + SeqKey = get_seq_key(Sig, DbPrefix), + erlfdb:set(Tx, SeqKey, Seq) + end). + + +get_seq_key(Sig, DbPrefix) -> + erlfdb_tuple:pack({?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ}, DbPrefix). + + +% Id Index access + +% (<db>, ?VIEWS, <sig>, ?VIEW_ID_INDEX, <_id>, <view_id>) -> [emitted keys] + +clear_id_index(TxDb, Sig, DocId, IdxName) -> + #{ + db_prefix := DbPrefix, + tx := Tx + } = TxDb, + IdKey = create_id_index_key(DbPrefix, Sig, DocId, IdxName), + ok = erlfdb:clear(Tx, IdKey). + + +set_id_index(TxDb, Sig, IdxName, DocId, IdxKey) -> + #{ + db_prefix := DbPrefix, + tx := Tx + } = TxDb, + IdKey = create_id_index_key(DbPrefix, Sig, DocId, IdxName), + erlfdb:set(Tx, IdKey, couch_views_encoding:encode(IdxKey)). + + +get_id_index(TxDb, Sig, Id, IdxName) -> + #{ + db_prefix := DbPrefix, + tx := Tx + } = TxDb, + IdKey = create_id_index_key(DbPrefix, Sig, Id, IdxName), + case erlfdb:wait(erlfdb:get(Tx, IdKey)) of + not_found -> not_found; + IdxKey -> couch_views_encoding:decode(IdxKey) + end. + + +create_id_index_key(DbPrefix, Sig, DocId, IdxName) -> + BaseIdKey = {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId, IdxName}, + erlfdb_tuple:pack(BaseIdKey, DbPrefix). + + +% Map Index Access +% {<db>, ?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, Idx, Key, DocId, +% RowType, Counter} = Values +% RowType = Emitted Keys or Emitted Value + + +clear_map_index(TxDb, Sig, IdxName, DocId, IdxKeys) when is_list(IdxKeys) -> + lists:foreach(fun (IdxKey) -> + clear_map_index(TxDb, Sig, IdxName, DocId, IdxKey) + end, IdxKeys); + +clear_map_index(TxDb, Sig, IdxName, DocId, IdxKey) -> + #{db_prefix := DbPrefix, tx := Tx} = TxDb, + Key = couch_views_encoding:encode(IdxKey), + BaseKey = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, IdxName, Key, DocId}, + {StartKey, EndKey} = erlfdb_tuple:range(BaseKey, DbPrefix), + ok = erlfdb:clear_range(Tx, StartKey, EndKey). + + +set_map_index_results(TxDb, Sig, IdxName, DocId, Results) -> + #{db_prefix := DbPrefix, tx := Tx} = TxDb, + lists:foldl(fun ({IdxKey, IdxValue}, Counter) -> + RowKey = create_map_key(DbPrefix, Sig, IdxName, IdxKey, DocId, + ?VIEW_ROW_KEY, Counter), + RowValue = create_map_key(DbPrefix, Sig, IdxName, IdxKey, DocId, + ?VIEW_ROW_VALUE, Counter), + + EncodedKey = pack_value(IdxKey), + EncodedValue = pack_value(IdxValue), + + ok = erlfdb:set(Tx, RowKey, EncodedKey), + ok = erlfdb:set(Tx, RowValue, EncodedValue), + Counter + 1 + end, 0, Results). + + +get_map_index_key(#{db_prefix := DbPrefix}, Sig, IdxName, Key) -> + EncKey = couch_views_encoding:encode(Key), + erlfdb_tuple:pack({?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, + IdxName, EncKey}, DbPrefix). + + +get_map_range_keys(#{db_prefix := DbPrefix}, Sig, IdxName) -> + erlfdb_tuple:range({?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, IdxName}, DbPrefix). + + +get_map_range(TxDb, Start, End, Opts) -> + #{tx := Tx} = TxDb, + erlfdb:get_range(Tx, Start, End, Opts). + + +unpack_map_row(#{db_prefix := DbPrefix}, Key, Value) -> + case erlfdb_tuple:unpack(Key, DbPrefix) of + {?DB_VIEWS, _Sig, ?VIEW_MAP_RANGE, _Idx, _RowKey, Id, + ?VIEW_ROW_KEY, _Counter} -> + RowKey = unpack_value(Value), + {key, Id, RowKey}; + + {?DB_VIEWS, _Sig, ?VIEW_MAP_RANGE, _Idx, _RowValue, Id, + ?VIEW_ROW_VALUE, _Counter} -> + RowValue = unpack_value(Value), + {value, Id, RowValue} + end. + + +create_map_key(DbPrefix, Sig, IdxName, IdxKey, DocId, RowType, Counter) -> + Key = couch_views_encoding:encode(IdxKey), + BaseKey = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, + IdxName, Key, DocId, RowType, Counter}, + erlfdb_tuple:pack(BaseKey, DbPrefix). + + +% Internal used to packed and unpack Values + + +pack_value(Val) when is_list(Val) -> + erlfdb_tuple:pack({?LIST_VALUE, list_to_tuple(Val)}); + +pack_value(Val) when is_tuple(Val) -> + {Props} = Val, + erlfdb_tuple:pack({?JSON_VALUE, list_to_tuple(Props)}); + +pack_value(Val) -> + erlfdb_tuple:pack({?VALUE, Val}). + + +unpack_value(Bin) -> + case erlfdb_tuple:unpack(Bin) of + {?LIST_VALUE, Val} -> + tuple_to_list(Val); + {?JSON_VALUE, Val} -> + {tuple_to_list(Val)}; + {?VALUE, Val} -> + Val + end. diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl new file mode 100644 index 0000000..e9f0b41 --- /dev/null +++ b/src/couch_views/src/couch_views_indexer.erl @@ -0,0 +1,262 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_views_indexer). + +-export([ + update/2, + update/4, + + % For tests + map_docs/2, + write_doc/4 +]). + + +-include("couch_views.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("fabric/src/fabric2.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +% TODO: +% * Handle timeouts of transaction and other errors + +update(Db, Mrst) -> + Noop = fun (_) -> ok end, + update(Db, Mrst, Noop, []). + + +update(#{} = Db, Mrst, ProgressCallback, ProgressArgs) + when is_function(ProgressCallback, 6) -> + try + Seq = couch_views_fdb:get_update_seq(Db, Mrst), + State = #{ + since_seq => Seq, + count => 0, + limit => config:get_integer("couch_views", "change_limit", 100), + doc_acc => [], + last_seq => Seq, + callback => ProgressCallback, + callback_args => ProgressArgs, + mrst => Mrst + }, + update_int(Db, State) + catch error:database_does_not_exist -> + #{db_prefix := DbPrefix} = Db, + couch_log:notice("couch_views_indexer stopped" + "- ~p database does not exist", [DbPrefix]) + end. + + +update_int(#{} = Db, State) -> + {ok, FinalState} = fabric2_fdb:transactional(Db, fun(TxDb) -> + State1 = maps:put(tx_db, TxDb, State), + fold_changes(State1) + end), + + #{ + count := Count, + limit := Limit, + doc_acc := DocAcc, + last_seq := LastSeq, + callback := Cb, + callback_args := CallbackArgs, + mrst := Mrst + } = FinalState, + + {MappedResults, Mrst1} = map_docs(Mrst, DocAcc), + write_docs(Db, Mrst1, MappedResults, FinalState), + + case Count < Limit of + true -> + Cb(undefined, finished, CallbackArgs, Db, Mrst, LastSeq); + false -> + NextState = maps:merge(FinalState, #{ + limit => Limit, + count => 0, + doc_acc => [], + since_seq => LastSeq, + last_seq => 0, + mrst => Mrst1 + }), + update_int(Db, NextState) + end. + + +fold_changes(State) -> + #{ + since_seq := SinceSeq, + limit := Limit, + tx_db := TxDb + } = State, + + fabric2_db:fold_changes(TxDb, SinceSeq, + fun process_changes/2, State, [{limit, Limit}]). + + +process_changes(Change, Acc) -> + #{ + doc_acc := DocAcc, + count := Count, + tx_db := TxDb, + mrst := Mrst + } = Acc, + + #{ + id := Id, + sequence := LastSeq, + deleted := Deleted + } = Change, + + IncludeDesign = lists:keymember(<<"include_design">>, 1, + Mrst#mrst.design_opts), + + Acc1 = case {Id, IncludeDesign} of + {<<"_design/", _/binary>>, false} -> + % {ok, Doc} = fabric2_db:open_doc(Db, Id), + maps:merge(Acc, #{ + count => Count + 1, + last_seq => LastSeq + }); + _ -> + + % Making a note here that we should make fetching all the docs + % a parallel fdb operation + Doc = if Deleted -> []; true -> + case fabric2_db:open_doc(TxDb, Id) of + {ok, Doc0} -> Doc0; + {not_found, _} -> [] + end + end, + + Change1 = maps:put(doc, Doc, Change), + maps:merge(Acc, #{ + doc_acc => DocAcc ++ [Change1], + count => Count + 1, + last_seq => LastSeq + }) + end, + {ok, Acc1}. + + +map_docs(Mrst, Docs) -> + % Run all the non deleted docs through the view engine and + Mrst1 = get_query_server(Mrst), + QServer = Mrst1#mrst.qserver, + + MapFun = fun + (#{deleted := true} = Change) -> + maps:put(results, [], Change); + + (Change) -> + #{doc := Doc} = Change, + couch_stats:increment_counter([couchdb, mrview, map_doc]), + {ok, RawResults} = couch_query_servers:map_doc_raw(QServer, Doc), + JsonResults = couch_query_servers:raw_to_ejson(RawResults), + ListResults = [[list_to_tuple(Res) || Res <- FunRs] + || FunRs <- JsonResults], + maps:put(results, ListResults, Change) + end, + MappedResults = lists:map(MapFun, Docs), + {MappedResults, Mrst1}. + + +start_query_server(#mrst{} = Mrst) -> + #mrst{ + language=Language, + lib=Lib, + views=Views + } = Mrst, + Defs = [View#mrview.def || View <- Views], + {ok, QServer} = couch_query_servers:start_doc_map(Language, Defs, Lib), + Mrst#mrst{qserver=QServer}. + + +get_query_server(#mrst{} = Mrst) -> + case Mrst#mrst.qserver of + nil -> start_query_server(Mrst); + _ -> Mrst + end. + + +write_docs(Db, Mrst, Docs, State) -> + #mrst{ + views = Views, + sig = Sig + } = Mrst, + + #{ + callback := Cb, + callback_args := CallbackArgs + } = State, + + IdxNames = lists:map(fun (View) -> + View#mrview.id_num + end, Views), + + lists:foreach(fun (Doc) -> + #{sequence := Seq} = Doc, + fabric2_fdb:transactional(Db, fun(TxDb) -> + couch_views_fdb:update_view_seq(TxDb, Sig, Seq), + Cb(TxDb, update, CallbackArgs, Db, Mrst, Seq), + write_doc(TxDb, Sig, Doc, IdxNames) + end) + end, Docs). + + +write_doc(TxDb, Sig, #{deleted := true} = Doc, ViewIds) -> + #{id := DocId} = Doc, + lists:foreach(fun (IdxName) -> + maybe_clear_id_and_map_index(TxDb, Sig, DocId, IdxName) + end, ViewIds); + +write_doc(TxDb, Sig, Doc, ViewIds) -> + #{id := DocId, results := Results} = Doc, + lists:foreach(fun + ({IdxName, []}) -> + maybe_clear_id_and_map_index(TxDb, Sig, DocId, IdxName); + ({IdxName, IdxResults}) -> + lists:foldl(fun (IdxResult, DocIdsCleared) -> + {IdxKey, _} = IdxResult, + OldIdxKey = couch_views_fdb:get_id_index(TxDb, Sig, + DocId, IdxName), + IsAlreadyCleared = lists:member(DocId, DocIdsCleared), + case OldIdxKey == not_found orelse IsAlreadyCleared == true of + true -> + couch_views_fdb:set_id_index(TxDb, Sig, IdxName, + DocId, IdxKey), + couch_views_fdb:set_map_index_results(TxDb, Sig, + IdxName, DocId, IdxResults); + false -> + couch_views_fdb:clear_id_index(TxDb, Sig, + DocId, IdxName), + couch_views_fdb:clear_map_index(TxDb, Sig, IdxName, + DocId, OldIdxKey), + couch_views_fdb:set_id_index(TxDb, Sig, DocId, + IdxName, IdxKey), + couch_views_fdb:set_map_index_results(TxDb, Sig, + IdxName, DocId, IdxResults) + end, + [DocId | DocIdsCleared] + end, [], IdxResults) + end, lists:zip(ViewIds, Results)). + + +maybe_clear_id_and_map_index(TxDb, Sig, DocId, IdxName) -> + OldIdxKey = couch_views_fdb:get_id_index(TxDb, Sig, + DocId, IdxName), + if OldIdxKey == not_found -> ok; true -> + couch_views_fdb:clear_id_index(TxDb, Sig, + DocId, IdxName), + couch_views_fdb:clear_map_index(TxDb, Sig, IdxName, + DocId, OldIdxKey) + end. diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl new file mode 100644 index 0000000..ff99475 --- /dev/null +++ b/src/couch_views/src/couch_views_jobs.erl @@ -0,0 +1,122 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_views_jobs). + +-export([ + status/2, + add/2, + + accept/0, + get_job_data/1, + update/5, + finish/5, + set_timeout/0, + + subscribe/2, + wait/1, + unsubscribe/1, + + create_job_id/2 +]). + + +-include("couch_views.hrl"). + + +% Query request usage of jobs + + +status(TxDb, Mrst) -> + JobId = create_job_id(TxDb, Mrst), + + case couch_jobs:get_job_state(TxDb, ?INDEX_JOB_TYPE, JobId) of + {ok, State} -> State; + {error, not_found} -> not_found; + Error -> Error + end. + + +add(TxDb, Mrst) -> + JobData = create_job_data(TxDb, Mrst, 0), + + JobId = create_job_id(TxDb, Mrst), + JTx = couch_jobs_fdb:get_jtx(), + couch_jobs:add(JTx, ?INDEX_JOB_TYPE, JobId, JobData). + + +% couch_views_worker api + + +accept() -> + couch_jobs:accept(?INDEX_JOB_TYPE). + + +get_job_data(JobId) -> + couch_jobs:get_job_data(undefined, ?INDEX_JOB_TYPE, JobId). + + +update(JTx, Job, Db, Mrst, LastSeq) -> + JobData = create_job_data(Db, Mrst, LastSeq), + couch_jobs:update(JTx, Job, JobData). + + +finish(JTx, Job, Db, Mrst, LastSeq) -> + JobData = create_job_data(Db, Mrst, LastSeq), + couch_jobs:finish(JTx, Job, JobData). + + +set_timeout() -> + couch_jobs:set_type_timeout(?INDEX_JOB_TYPE, 6 * 1000). + + +% Watcher Job api + + +subscribe(Db, Mrst) -> + JobId = create_job_id(Db, Mrst), + couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId). + + +wait(JobSubscription) -> + case couch_jobs:wait(JobSubscription, infinity) of + {?INDEX_JOB_TYPE, _JobId, JobState, JobData} -> {JobState, JobData}; + {timeout} -> {error, timeout} + end. + + +unsubscribe(JobSubscription) -> + couch_jobs:unsubscribe(JobSubscription). + + +% Internal + + +create_job_id(#{name := DbName}, #mrst{sig = Sig}) -> + create_job_id(DbName, Sig); + +create_job_id(DbName, Sig) -> + <<DbName/binary, Sig/binary>>. + + +create_job_data(Db, Mrst, LastSeq) -> + #{name := DbName} = Db, + + #mrst{ + idx_name = DDocId + } = Mrst, + + #{ + db_name => DbName, + ddoc_id => DDocId, + last_seq => LastSeq + }. diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl new file mode 100644 index 0000000..2ddb5b6 --- /dev/null +++ b/src/couch_views/src/couch_views_reader.erl @@ -0,0 +1,204 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_views_reader). + +-export([ + read/6 +]). + + +-include("couch_views.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("fabric/src/fabric2.hrl"). + + +read(Db, DDoc, ViewName, Callback, Acc0, Args) -> + #{name := DbName} = Db, + + {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc), + #mrst{ + sig = Sig, + views = Views + } = Mrst, + + IdxName = get_idx_name(ViewName, Views), + State0 = #{ + acc => Acc0, + skip => maps:get(skip, Args, 0), + include_docs => maps:get(include_docs, Args, false), + db => Db + }, + + DefaultOpts = [{streaming_mode, want_all}], + {Start, End, QueryOpts} = convert_args_to_fdb(Db, Sig, IdxName, Args, + DefaultOpts), + Opts = QueryOpts ++ DefaultOpts, + + fabric2_fdb:transactional(Db, fun(TxDb) -> + Future = couch_views_fdb:get_map_range(TxDb, Start, End, Opts), + + UnPack = get_unpack_fun(TxDb, Opts, Callback), + State1 = lists:foldl(UnPack, State0, erlfdb:wait(Future)), + + #{acc := Acc1} = State1, + Callback(complete, Acc1) + end). + + +get_idx_name(ViewName, Views) -> + {value, View} = lists:search(fun (View) -> + lists:member(ViewName, View#mrview.map_names) + end, Views), + View#mrview.id_num. + + +convert_args_to_fdb(Db, Sig, IdxName, Args, Opts) -> + #{ + direction := Direction + } = Args, + + {Start1, End1} = get_range_keys(Db, Sig, IdxName, Args), + + Opts1 = case maps:is_key(limit, Args) of + false -> + Opts; + true -> + Skip = maps:get(skip, Args, 0), + Limit = maps:get(limit, Args), + % Limit is multiplied by two because there are two rows per key + % value. + % Skip is added because that is done in the fold so we need + % to fetch the number of documents + % along with the docs we would skip. + % Limit = (Doc limit + Skip) * Num of Rows per Map KV + [{limit, (Limit + Skip) * 2} | Opts] + end, + + Opts2 = case Direction of + fwd -> + Opts1; + rev -> + [{reverse, true} | Opts1] + end, + {Start1, End1, Opts2}. + + +get_range_keys(Db, Sig, IdxName, Args) -> + #{ + inclusive_end := InclusiveEnd, + direction := Direction + } = Args, + + {MapStartKey, MapEndKey} = case Direction of + fwd -> {start_key, end_key}; + rev -> {end_key, start_key} + end, + + {Start0, End0} = couch_views_fdb:get_map_range_keys(Db, Sig, IdxName), + + Start1 = case maps:is_key(MapStartKey, Args) of + false -> + Start0; + true -> + StartKey = maps:get(MapStartKey, Args), + Start = couch_views_fdb:get_map_index_key(Db, Sig, IdxName, + StartKey), + erlfdb_key:first_greater_or_equal(Start) + end, + + End1 = case maps:is_key(MapEndKey, Args) of + false -> + End0; + true -> + EndKey = maps:get(MapEndKey, Args), + EndBin = couch_views_fdb:get_map_index_key(Db, Sig, IdxName, + EndKey), + EndBin1 = case InclusiveEnd of + true -> <<EndBin/binary, 16#FF>>; + false -> EndBin + end, + erlfdb_key:first_greater_than(EndBin1) + end, + {Start1, End1}. + + +get_unpack_fun(TxDb, Opts, Callback) -> + UnPackFwd = fun({K, V}, State) -> + case couch_views_fdb:unpack_map_row(TxDb, K, V) of + {key, _Id, RowKey} -> + maps:put(current_key, RowKey, State); + {value, Id, RowValue} -> + #{ + current_key := RowKey, + acc := Acc, + skip := Skip, + db := Db + } = State, + + case Skip > 0 of + true -> + maps:put(skip, Skip - 1, State); + false -> + Row = [{id, Id}, {key, RowKey}, {value, RowValue}], + + IncludeDoc = maps:get(include_docs, State, false), + Row1 = maybe_include_doc(Db, Id, Row, IncludeDoc), + + {ok, AccNext} = Callback({row, Row1}, Acc), + maps:put(acc, AccNext, State) + end + end + end, + + UnPackRev = fun({K, V}, State) -> + case couch_views_fdb:unpack_map_row(TxDb, K, V) of + {key, Id, RowKey} -> + #{ + current_value := RowValue, + acc := Acc, + skip := Skip, + db := Db + } = State, + + case Skip > 0 of + true -> + maps:put(skip, Skip - 1, State); + false -> + Row = [{id, Id}, {key, RowKey}, {value, RowValue}], + + IncludeDoc = maps:get(include_docs, State, false), + Row1 = maybe_include_doc(Db, Id, Row, IncludeDoc), + + {ok, AccNext} = Callback({row, Row1}, Acc), + maps:put(acc, AccNext, State) + end; + {value, _Id, RowValue} -> + maps:put(current_value, RowValue, State) + end + end, + + case lists:keyfind(reverse, 1, Opts) of + {reverse, true} -> UnPackRev; + _ -> UnPackFwd + end. + + +maybe_include_doc(_Db, _Id, Row, false) -> + Row; + +maybe_include_doc(Db, Id, Row, true) -> + Doc1 = case fabric2_db:open_doc(Db, Id) of + {ok, Doc} -> couch_doc:to_json_obj(Doc, []); + {not_found, _} -> [] + end, + Row ++ [{doc, Doc1}]. diff --git a/src/couch_views/src/couch_views_sup.erl b/src/couch_views/src/couch_views_sup.erl new file mode 100644 index 0000000..da7d796 --- /dev/null +++ b/src/couch_views/src/couch_views_sup.erl @@ -0,0 +1,46 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +-module(couch_views_sup). + + +-behaviour(supervisor). + + +-export([ + start_link/1 +]). + + +-export([ + init/1 +]). + + +start_link(Args) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, Args). + + +init([]) -> + Flags = #{ + strategy => one_for_one, + intensity => 1, + period => 5 + }, + Children = [ + #{ + id => couch_views_worker_server, + start => {couch_views_worker_server, start_link, []} + } + ], + {ok, {Flags, Children}}. diff --git a/src/couch_views/src/couch_views_util.erl b/src/couch_views/src/couch_views_util.erl new file mode 100644 index 0000000..d7ed29f --- /dev/null +++ b/src/couch_views/src/couch_views_util.erl @@ -0,0 +1,83 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_views_util). + + +-export([ + ddoc_to_mrst/2 +]). + + +-include_lib("couch/include/couch_db.hrl"). +-include("couch_views.hrl"). + + +ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) -> + MakeDict = fun({Name, {MRFuns}}, DictBySrcAcc) -> + case couch_util:get_value(<<"map">>, MRFuns) of + MapSrc when MapSrc /= undefined -> + RedSrc = couch_util:get_value(<<"reduce">>, MRFuns, null), + {ViewOpts} = couch_util:get_value(<<"options">>, MRFuns, {[]}), + View = case dict:find({MapSrc, ViewOpts}, DictBySrcAcc) of + {ok, View0} -> View0; + error -> #mrview{def=MapSrc, options=ViewOpts} + end, + {MapNames, RedSrcs} = case RedSrc of + null -> + MNames = [Name | View#mrview.map_names], + {MNames, View#mrview.reduce_funs}; + _ -> + RedFuns = [{Name, RedSrc} | View#mrview.reduce_funs], + {View#mrview.map_names, RedFuns} + end, + View2 = View#mrview{map_names=MapNames, reduce_funs=RedSrcs}, + dict:store({MapSrc, ViewOpts}, View2, DictBySrcAcc); + undefined -> + DictBySrcAcc + end; + ({Name, Else}, DictBySrcAcc) -> + couch_log:error("design_doc_to_view_group ~s views ~p", + [Name, Else]), + DictBySrcAcc + end, + {DesignOpts} = proplists:get_value(<<"options">>, Fields, {[]}), + SeqIndexed = proplists:get_value(<<"seq_indexed">>, DesignOpts, false), + KeySeqIndexed = proplists:get_value(<<"keyseq_indexed">>, + DesignOpts, false), + Partitioned = proplists:get_value(<<"partitioned">>, DesignOpts, false), + + {RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}), + BySrc = lists:foldl(MakeDict, dict:new(), RawViews), + + NumViews = fun({_, View}, N) -> + {View#mrview{id_num=N, seq_indexed=SeqIndexed, + keyseq_indexed=KeySeqIndexed}, N+1} + end, + {Views, _} = lists:mapfoldl(NumViews, 0, lists:sort(dict:to_list(BySrc))), + + Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>), + Lib = couch_util:get_value(<<"lib">>, RawViews, {[]}), + + IdxState = #mrst{ + db_name=DbName, + idx_name=Id, + lib=Lib, + views=Views, + language=Language, + design_opts=DesignOpts, + seq_indexed=SeqIndexed, + keyseq_indexed=KeySeqIndexed, + partitioned=Partitioned + }, + SigInfo = {Views, Language, DesignOpts, couch_index_util:sort_lib(Lib)}, + {ok, IdxState#mrst{sig=couch_hash:md5_hash(term_to_binary(SigInfo))}}. diff --git a/src/couch_views/src/couch_views_worker.erl b/src/couch_views/src/couch_views_worker.erl new file mode 100644 index 0000000..fa641d5 --- /dev/null +++ b/src/couch_views/src/couch_views_worker.erl @@ -0,0 +1,44 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_views_worker). + +-export([ + start/2, + job_progress/6 +]). + + +start(Job, JobData) -> + {ok, Db, Mrst} = get_indexing_info(JobData), + % maybe we should spawn here + couch_views_indexer:update(Db, Mrst, fun job_progress/6, Job). + + +job_progress(Tx, Progress, Job, Db, Mrst, LastSeq) -> + case Progress of + update -> + couch_views_jobs:update(Tx, Job, Db, Mrst, LastSeq); + finished -> + couch_views_jobs:finish(Tx, Job, Db, Mrst, LastSeq) + end. + + +get_indexing_info(JobData) -> + #{ + <<"db_name">> := DbName, + <<"ddoc_id">> := DDocId + } = JobData, + {ok, Db} = fabric2_db:open(DbName, []), + {ok, DDoc} = fabric2_db:open_doc(Db, DDocId), + {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc), + {ok, Db, Mrst}. diff --git a/src/couch_views/src/couch_views_worker_server.erl b/src/couch_views/src/couch_views_worker_server.erl new file mode 100644 index 0000000..1c815e5 --- /dev/null +++ b/src/couch_views/src/couch_views_worker_server.erl @@ -0,0 +1,110 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_views_worker_server). + + +-behaviour(gen_server). + + +-export([ + start_link/0 +]). + + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +-define(TYPE_CHECK_PERIOD_DEFAULT, 500). +-define(MAX_JITTER_DEFAULT, 100). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +init(_) -> + couch_views_jobs:set_timeout(), + schedule_check(), + {ok, #{}}. + + +terminate(_, _St) -> + ok. + + +handle_call(Msg, _From, St) -> + {stop, {bad_call, Msg}, {bad_call, Msg}, St}. + + +handle_cast(Msg, St) -> + {stop, {bad_cast, Msg}, St}. + + +handle_info(check_for_jobs, State) -> + accept_jobs(), + schedule_check(), + {noreply, State}; + +handle_info({'DOWN', _Ref, process, Pid, Reason}, St) -> + LogMsg = "~p : process ~p exited with ~p", + couch_log:error(LogMsg, [?MODULE, Pid, Reason]), + {noreply, St}; + +handle_info(Msg, St) -> + couch_log:notice("~s ignoring info ~w", [?MODULE, Msg]), + {noreply, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +accept_jobs() -> + case couch_views_jobs:accept() of + not_found -> + ok; + {ok, Job, JobData} -> + start_worker(Job, JobData), + % keep accepting jobs until not_found + accept_jobs() + end. + + +start_worker(Job, JobData) -> + % TODO Should I monitor it, or let jobs do that? + spawn_monitor(fun () -> couch_views_worker:start(Job, JobData) end), + ok. + + +schedule_check() -> + Timeout = get_period_msec(), + MaxJitter = max(Timeout div 2, get_max_jitter_msec()), + Wait = Timeout + rand:uniform(max(1, MaxJitter)), + timer:send_after(Wait, self(), check_for_jobs). + + +get_period_msec() -> + config:get_integer("couch_views", "type_check_period_msec", + ?TYPE_CHECK_PERIOD_DEFAULT). + + +get_max_jitter_msec() -> + config:get_integer("couch_views", "type_check_max_jitter_msec", + ?MAX_JITTER_DEFAULT). diff --git a/src/couch_views/test/couch_views_encoding_test.erl b/src/couch_views/test/couch_views_encoding_test.erl new file mode 100644 index 0000000..a73cb42 --- /dev/null +++ b/src/couch_views/test/couch_views_encoding_test.erl @@ -0,0 +1,73 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_views_encoding_test). + +-include_lib("eunit/include/eunit.hrl"). + +val_encoding_test() -> + Values = [ + null, + true, + 1.0, + <<"a">>, + {[{<<"a">>, 1.0}, {<<"b">>, <<"hello">>}]} + ], + lists:foreach(fun (Val) -> + EncVal = couch_views_encoding:encode(Val), + ?assertEqual(Val, couch_views_encoding:decode(EncVal)) + end, Values). + + +correct_ordering_test() -> + Ordered = [ + % Special values sort before all other types + null, + false, + true, + + % Then numbers + % 1, + % 2, + % 3.0, + % 4, + + 1.0, + 2.0, + 3.0, + 4.0, + + [<<"a">>], + [<<"b">>], + [<<"b">>, <<"c">>], + [<<"b">>, <<"c">>, <<"a">>], + [<<"b">>, <<"d">>], + [<<"b">>, <<"d">>, <<"e">>], + + % Then objects, compared each key value in the list until different. + % Larger objects sort after their subset objects + {[{<<"a">>, 1.0}]}, + {[{<<"a">>, 2.0}]}, + {[{<<"b">>, 1.0}]}, + {[{<<"b">>, 2.0}]}, + + % Member order does matter for collation + {[{<<"b">>, 2.0}, {<<"a">>, 1.0}]}, + {[{<<"b">>, 2.0}, {<<"c">>, 2.0}]} + + ], + + BinList = lists:map(fun couch_views_encoding:encode/1, Ordered), + SortedBinList = lists:sort(BinList), + DecodedBinList = lists:map(fun couch_views_encoding:decode/1, + SortedBinList), + ?assertEqual(Ordered, DecodedBinList). diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl new file mode 100644 index 0000000..2d192a6 --- /dev/null +++ b/src/couch_views/test/couch_views_indexer_test.erl @@ -0,0 +1,258 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_views_indexer_test). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). + + +-define(TDEF(A), {atom_to_list(A), fun A/0}). + +setup() -> + test_util:start_couch([fabric]). + + +teardown(State) -> + test_util:stop_couch(State). + + +foreach_setup() -> + ok. + + +foreach_teardown(_) -> + meck:unload(). + + +index_server_test_() -> + { + "Test Couch Views indexer", + { + setup, + fun setup/0, + fun teardown/1, + { + foreach, + fun foreach_setup/0, fun foreach_teardown/1, + [ + ?TDEF(map_docs_no_results_for_deleted), + ?TDEF(map_docs_returns_sorted_results), + ?TDEF(write_doc_clears_for_deleted_doc), + ?TDEF(write_doc_adds_for_new_doc), + ?TDEF(write_doc_clears_and_sets_for_update), + ?TDEF(write_doc_clears_for_no_new_update), + ?TDEF(write_doc_clears_and_updates_duplicates) + ] + } + + } + }. + + +map_docs_no_results_for_deleted() -> + DbName = ?tempdb, + + DDoc = create_ddoc(), + {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc), + + Doc = #{ + id => <<"id">>, + sequence => <<1111>>, + rev_id => <<"1-123">>, + deleted => true + }, + + meck:expect(couch_query_servers, start_doc_map, fun(_, _, _) -> + {ok, fake} + end), + + {Results, _} = couch_views_indexer:map_docs(Mrst, [Doc]), + + [#{results := DocResult}] = Results, + ?assertEqual([], DocResult). + + +map_docs_returns_sorted_results() -> + DbName = ?tempdb, + Doc = #{ + id => <<"id">>, + sequence => <<1111>>, + rev_id => <<"1-123">>, + doc => doc(1) + }, + + CompleteResult = [[{1, 1}], []], + + DDoc = create_ddoc(), + {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc), + + + {Results, _} = couch_views_indexer:map_docs(Mrst, [Doc]), + [#{results := DocResult}] = Results, + ?assertEqual(CompleteResult, DocResult). + + +write_doc_clears_for_deleted_doc() -> + TxDb = #{}, + Sig = <<123>>, + Doc = #{deleted => true, id => 1}, + ViewIds = [1], + OldIdxKey = old_key, + + meck:expect(couch_views_fdb, get_id_index, 4, old_key), + meck:expect(couch_views_fdb, clear_id_index, 4, ok), + meck:expect(couch_views_fdb, clear_map_index, 5, ok), + + couch_views_indexer:write_doc(TxDb, Sig, Doc, ViewIds), + ?assert(meck:called(couch_views_fdb, get_id_index, [TxDb, Sig, 1, 1])), + ?assert(meck:called(couch_views_fdb, clear_id_index, [TxDb, Sig, 1, 1])), + ?assert(meck:called(couch_views_fdb, clear_map_index, + [TxDb, Sig, 1, 1, OldIdxKey])), + ?assertEqual(length(meck:history(couch_views_fdb)), 3). + + +write_doc_adds_for_new_doc() -> + TxDb = #{}, + Sig = <<123>>, + Key = <<"key">>, + Value = 1, + Results = [{Key, Value}], + Doc = #{ + deleted => false, + id => 1, + results => [Results] + }, + ViewIds = [1], + + meck:expect(couch_views_fdb, get_id_index, 4, not_found), + meck:expect(couch_views_fdb, set_id_index, 5, ok), + meck:expect(couch_views_fdb, set_map_index_results, 5, ok), + + couch_views_indexer:write_doc(TxDb, Sig, Doc, ViewIds), + ?assert(meck:called(couch_views_fdb, get_id_index, [TxDb, Sig, 1, 1])), + ?assert(meck:called(couch_views_fdb, set_id_index, + [TxDb, Sig, 1, 1, Key])), + ?assert(meck:called(couch_views_fdb, set_map_index_results, + [TxDb, Sig, 1, 1, Results])), + ?assertEqual(length(meck:history(couch_views_fdb)), 3). + + +write_doc_clears_and_sets_for_update() -> + TxDb = #{}, + Sig = <<123>>, + Key = <<"key">>, + Value = 1, + Results = [{Key, Value}], + Doc = #{ + deleted => false, + id => 1, + results => [Results] + }, + ViewIds = [1], + OldKey = oldkey, + + meck:expect(couch_views_fdb, get_id_index, 4, OldKey), + meck:expect(couch_views_fdb, clear_id_index, 4, ok), + meck:expect(couch_views_fdb, clear_map_index, 5, ok), + meck:expect(couch_views_fdb, set_id_index, 5, ok), + meck:expect(couch_views_fdb, set_map_index_results, 5, ok), + + couch_views_indexer:write_doc(TxDb, Sig, Doc, ViewIds), + ?assert(meck:called(couch_views_fdb, get_id_index, [TxDb, Sig, 1, 1])), + ?assert(meck:called(couch_views_fdb, clear_id_index, [TxDb, Sig, 1, 1])), + ?assert(meck:called(couch_views_fdb, clear_map_index, + [TxDb, Sig, 1, 1, OldKey])), + ?assert(meck:called(couch_views_fdb, set_id_index, + [TxDb, Sig, 1, 1, Key])), + ?assert(meck:called(couch_views_fdb, set_map_index_results, + [TxDb, Sig, 1, 1, Results])), + ?assertEqual(length(meck:history(couch_views_fdb)), 5). + + +write_doc_clears_for_no_new_update() -> + TxDb = #{}, + Sig = <<123>>, + Results = [], + Doc = #{ + deleted => false, + id => 1, + results => [Results] + }, + ViewIds = [1], + OldKey = oldkey, + + meck:expect(couch_views_fdb, get_id_index, 4, OldKey), + meck:expect(couch_views_fdb, clear_id_index, 4, ok), + meck:expect(couch_views_fdb, clear_map_index, 5, ok), + + couch_views_indexer:write_doc(TxDb, Sig, Doc, ViewIds), + ?assert(meck:called(couch_views_fdb, get_id_index, [TxDb, Sig, 1, 1])), + ?assert(meck:called(couch_views_fdb, clear_id_index, [TxDb, Sig, 1, 1])), + ?assert(meck:called(couch_views_fdb, clear_map_index, + [TxDb, Sig, 1, 1, OldKey])), + ?assertEqual(length(meck:history(couch_views_fdb)), 3). + + +write_doc_clears_and_updates_duplicates() -> + TxDb = #{}, + Sig = <<123>>, + Key = <<"key">>, + Results = [{Key, 1}, {Key, 2}], + Doc = #{ + deleted => false, + id => 1, + results => [Results] + }, + ViewIds = [1], + OldKey = oldkey, + + meck:expect(couch_views_fdb, get_id_index, 4, OldKey), + meck:expect(couch_views_fdb, clear_id_index, 4, ok), + meck:expect(couch_views_fdb, clear_map_index, 5, ok), + meck:expect(couch_views_fdb, set_id_index, 5, ok), + meck:expect(couch_views_fdb, set_map_index_results, 5, ok), + + couch_views_indexer:write_doc(TxDb, Sig, Doc, ViewIds), + ?assertEqual(meck:num_calls(couch_views_fdb, get_id_index, + [TxDb, Sig, 1, 1]), 2), + ?assertEqual(meck:num_calls(couch_views_fdb, clear_id_index, + [TxDb, Sig, 1, 1]), 1), + ?assertEqual(meck:num_calls(couch_views_fdb, set_id_index, + [TxDb, Sig, 1, 1, Key]), 2), + ?assertEqual(meck:num_calls(couch_views_fdb, clear_map_index, + [TxDb, Sig, 1, 1, OldKey]), 1), + ?assertEqual(meck:num_calls(couch_views_fdb, set_map_index_results, + [TxDb, Sig, 1, 1, Results]), 2), + ?assertEqual(length(meck:history(couch_views_fdb)), 8). + + +create_ddoc() -> + couch_doc:from_json_obj({[ + {<<"_id">>, <<"_design/bar">>}, + {<<"views">>, {[ + {<<"map_fun1">>, {[ + {<<"map">>, <<"function(doc) {emit(doc.val, doc.val);}">>} + ]}}, + {<<"map_fun2">>, {[ + {<<"map">>, <<"function(doc) {}">>} + ]}} + ]}} + ]}). + + +doc(Id) -> + couch_doc:from_json_obj({[ + {<<"_id">>, list_to_binary(integer_to_list(Id))}, + {<<"val">>, Id} + ]}). diff --git a/src/couch_views/test/couch_views_map_test.erl b/src/couch_views/test/couch_views_map_test.erl new file mode 100644 index 0000000..bbad93f --- /dev/null +++ b/src/couch_views/test/couch_views_map_test.erl @@ -0,0 +1,484 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_views_map_test). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + + +-define(TDEF(A), {atom_to_list(A), fun A/0}). + + +setup() -> + test_util:start_couch([fabric, couch_jobs, couch_views]). + + +teardown(State) -> + test_util:stop_couch(State). + + +map_views_test_() -> + { + "Map views", + { + setup, + fun setup/0, + fun teardown/1, + [ + ?TDEF(should_map), + ?TDEF(should_map_with_startkey), + ?TDEF(should_map_with_endkey), + ?TDEF(should_map_with_endkey_not_inclusive), + ?TDEF(should_map_reverse_and_limit), + ?TDEF(should_map_with_range_reverse), + ?TDEF(should_map_with_limit_and_skip), + ?TDEF(should_map_with_limit_and_skip_reverse), + ?TDEF(should_map_with_include_docs), + ?TDEF(should_map_with_include_docs_reverse), + ?TDEF(should_map_with_startkey_with_key_array), + ?TDEF(should_map_with_startkey_and_endkey_with_key_array), + ?TDEF(should_map_empty_views), + ?TDEF(should_map_duplicate_keys), + ?TDEF(should_map_with_doc_emit), + ?TDEF(should_map_update_is_false), + ?TDEF(should_map_update_is_lazy) + % fun should_give_ext_size_seq_indexed_test/1 + ] + } + }. + + +should_map() -> + Result = run_query(<<"baz">>, #{}), + Expect = {ok, [ + {row, [{id, <<"1">>}, {key, 1}, {value, 1}]}, + {row, [{id, <<"2">>}, {key, 2}, {value, 2}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}, + {row, [{id, <<"6">>}, {key, 6}, {value, 6}]}, + {row, [{id, <<"7">>}, {key, 7}, {value, 7}]}, + {row, [{id, <<"8">>}, {key, 8}, {value, 8}]}, + {row, [{id, <<"9">>}, {key, 9}, {value, 9}]}, + {row, [{id, <<"10">>}, {key, 10}, {value, 10}]} + ]}, + ?assertEqual(Expect, Result). + + +should_map_with_startkey() -> + Result = run_query(<<"baz">>, #{start_key => 4}), + Expect = {ok, [ + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}, + {row, [{id, <<"6">>}, {key, 6}, {value, 6}]}, + {row, [{id, <<"7">>}, {key, 7}, {value, 7}]}, + {row, [{id, <<"8">>}, {key, 8}, {value, 8}]}, + {row, [{id, <<"9">>}, {key, 9}, {value, 9}]}, + {row, [{id, <<"10">>}, {key, 10}, {value, 10}]} + ]}, + ?assertEqual(Expect, Result). + + +should_map_with_endkey() -> + Result = run_query(<<"baz">>, #{end_key => 5}), + Expect = {ok, [ + {row, [{id, <<"1">>}, {key, 1}, {value, 1}]}, + {row, [{id, <<"2">>}, {key, 2}, {value, 2}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]} + ]}, + ?assertEqual(Expect, Result). + + +should_map_with_endkey_not_inclusive() -> + Result = run_query(<<"baz">>, #{ + end_key => 5, + inclusive_end => false + }), + Expect = {ok, [ + {row, [{id, <<"1">>}, {key, 1}, {value, 1}]}, + {row, [{id, <<"2">>}, {key, 2}, {value, 2}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]} + ]}, + ?assertEqual(Expect, Result). + + +should_map_reverse_and_limit() -> + Result = run_query(<<"baz">>, #{ + direction => rev, + limit => 3 + }), + Expect = {ok, [ + {row, [{id, <<"10">>}, {key, 10}, {value, 10}]}, + {row, [{id, <<"9">>}, {key, 9}, {value, 9}]}, + {row, [{id, <<"8">>}, {key, 8}, {value, 8}]} + ]}, + ?assertEqual(Expect, Result). + + +should_map_with_range_reverse() -> + Result = run_query(<<"baz">>, #{ + direction => rev, + start_key => 5, + end_key => 3, + inclusive_end => true + }), + Expect = {ok, [ + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]} + ]}, + ?assertEqual(Expect, Result). + + +should_map_with_limit_and_skip() -> + Result = run_query(<<"baz">>, #{ + start_key => 2, + limit => 3, + skip => 3 + }), + Expect = {ok, [ + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}, + {row, [{id, <<"6">>}, {key, 6}, {value, 6}]}, + {row, [{id, <<"7">>}, {key, 7}, {value, 7}]} + ]}, + ?assertEqual(Expect, Result). + + +should_map_with_limit_and_skip_reverse() -> + Result = run_query(<<"baz">>, #{ + start_key => 10, + limit => 3, + skip => 3, + direction => rev + }), + Expect = {ok, [ + {row, [{id, <<"7">>}, {key, 7}, {value, 7}]}, + {row, [{id, <<"6">>}, {key, 6}, {value, 6}]}, + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]} + ]}, + ?assertEqual(Expect, Result). + + +should_map_with_include_docs() -> + Result = run_query(<<"baz">>, #{ + start_key => 8, + end_key => 8, + include_docs => true + }), + Doc = {[ + {<<"_id">>, <<"8">>}, + {<<"_rev">>, <<"1-55b9a29311341e07ec0a7ca13bc1b59f">>}, + {<<"val">>, 8} + ]}, + Expect = {ok, [ + {row, [{id, <<"8">>}, {key, 8}, {value, 8}, {doc, Doc}]} + ]}, + ?assertEqual(Expect, Result). + + +should_map_with_include_docs_reverse() -> + Result = run_query(<<"baz">>, #{ + start_key => 8, + end_key => 8, + include_docs => true, + direction => rev + }), + Doc = {[ + {<<"_id">>, <<"8">>}, + {<<"_rev">>, <<"1-55b9a29311341e07ec0a7ca13bc1b59f">>}, + {<<"val">>, 8} + ]}, + Expect = {ok, [ + {row, [{id, <<"8">>}, {key, 8}, {value, 8}, {doc, Doc}]} + ]}, + ?assertEqual(Expect, Result). + + +should_map_with_startkey_with_key_array() -> + Rows = [ + {row, [{id, <<"4">>}, {key, [<<"4">>, 4]}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, [<<"5">>, 5]}, {value, 5}]}, + {row, [{id, <<"6">>}, {key, [<<"6">>, 6]}, {value, 6}]}, + {row, [{id, <<"7">>}, {key, [<<"7">>, 7]}, {value, 7}]}, + {row, [{id, <<"8">>}, {key, [<<"8">>, 8]}, {value, 8}]}, + {row, [{id, <<"9">>}, {key, [<<"9">>, 9]}, {value, 9}]} + ], + + Result = run_query(<<"boom">>, #{ + start_key => [<<"4">>] + }), + + ?assertEqual({ok, Rows}, Result), + + ResultRev = run_query(<<"boom">>, #{ + start_key => [<<"9">>, 9], + direction => rev, + limit => 6 + }), + + ?assertEqual({ok, lists:reverse(Rows)}, ResultRev). + + +should_map_with_startkey_and_endkey_with_key_array() -> + Rows = [ + {row, [{id, <<"4">>}, {key, [<<"4">>, 4]}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, [<<"5">>, 5]}, {value, 5}]}, + {row, [{id, <<"6">>}, {key, [<<"6">>, 6]}, {value, 6}]}, + {row, [{id, <<"7">>}, {key, [<<"7">>, 7]}, {value, 7}]}, + {row, [{id, <<"8">>}, {key, [<<"8">>, 8]}, {value, 8}]} + ], + + Result = run_query(<<"boom">>, #{ + start_key => [<<"4">>], + end_key => [<<"8">>, []] + }), + + ?assertEqual({ok, Rows}, Result), + + ResultRev = run_query(<<"boom">>, #{ + start_key => [<<"8">>, []], + end_key => [<<"4">>], + direction => rev + }), + + ?assertEqual({ok, lists:reverse(Rows)}, ResultRev), + + ResultRev2 = run_query(<<"boom">>, #{ + start_key => [<<"9">>, 9], + end_key => [<<"4">>], + direction => rev, + inclusive_end => false + }), + + ?assertEqual({ok, lists:reverse(Rows)}, ResultRev2). + + +should_map_empty_views() -> + Result = run_query(<<"bing">>, #{}), + Expect = {ok, []}, + ?assertEqual(Expect, Result). + + +should_map_with_doc_emit() -> + Result = run_query(<<"doc_emit">>, #{ + start_key => 8, + limit => 1 + }), + Doc = {[ + {<<"_id">>, <<"8">>}, + {<<"_rev">>, <<"1-55b9a29311341e07ec0a7ca13bc1b59f">>}, + {<<"val">>, 8} + ]}, + Expect = {ok, [ + {row, [{id, <<"8">>}, {key, 8}, {value, Doc}]} + ]}, + ?assertEqual(Expect, Result). + + +should_map_duplicate_keys() -> + Result = run_query(<<"duplicate_keys">>, #{ + limit => 6 + }), + Expect = {ok, [ + {row, [{id, <<"1">>}, {key, <<"1">>}, {value, 1}]}, + {row, [{id, <<"1">>}, {key, <<"1">>}, {value, 2}]}, + {row, [{id, <<"10">>}, {key, <<"10">>}, {value, 10}]}, + {row, [{id, <<"10">>}, {key, <<"10">>}, {value, 11}]}, + {row, [{id, <<"2">>}, {key, <<"2">>}, {value, 2}]}, + {row, [{id, <<"2">>}, {key, <<"2">>}, {value, 3}]} + ]}, + ?debugFmt("EXPE ~p ~n", [Expect]), + ?assertEqual(Expect, Result). + + +should_map_update_is_false() -> + Expect = {ok, [ + {row, [{id, <<"8">>}, {key, 8}, {value, 8}]}, + {row, [{id, <<"9">>}, {key, 9}, {value, 9}]}, + {row, [{id, <<"10">>}, {key, 10}, {value, 10}]} + ]}, + + Expect1 = {ok, [ + {row, [{id, <<"8">>}, {key, 8}, {value, 8}]}, + {row, [{id, <<"9">>}, {key, 9}, {value, 9}]}, + {row, [{id, <<"10">>}, {key, 10}, {value, 10}]}, + {row, [{id, <<"11">>}, {key, 11}, {value, 11}]} + ]}, + + Idx = <<"baz">>, + DbName = ?tempdb(), + + {ok, Db} = fabric2_db:create(DbName, [{user_ctx, ?ADMIN_USER}]), + + DDoc = create_ddoc(), + Docs = make_docs(10), + fabric2_db:update_docs(Db, [DDoc | Docs]), + + Args1 = #{ + start_key => 8 + }, + + Result1 = couch_views:map_query(Db, DDoc, Idx, fun default_cb/2, + [], Args1), + ?assertEqual(Expect, Result1), + + Doc = doc(11), + fabric2_db:update_doc(Db, Doc), + + Args2 = #{ + start_key => 8, + update => false + }, + + Result2 = couch_views:map_query(Db, DDoc, Idx, fun default_cb/2, + [], Args2), + ?assertEqual(Expect, Result2), + + Result3 = couch_views:map_query(Db, DDoc, Idx, fun default_cb/2, + [], Args1), + ?assertEqual(Expect1, Result3). + + +should_map_update_is_lazy() -> + Expect = {ok, [ + {row, [{id, <<"8">>}, {key, 8}, {value, 8}]}, + {row, [{id, <<"9">>}, {key, 9}, {value, 9}]}, + {row, [{id, <<"10">>}, {key, 10}, {value, 10}]} + ]}, + + Idx = <<"baz">>, + DbName = ?tempdb(), + + {ok, Db} = fabric2_db:create(DbName, [{user_ctx, ?ADMIN_USER}]), + + DDoc = create_ddoc(), + Docs = make_docs(10), + + fabric2_db:update_docs(Db, [DDoc | Docs]), + + Args1 = #{ + start_key => 8, + update => lazy + }, + + Result1 = couch_views:map_query(Db, DDoc, Idx, fun default_cb/2, + [], Args1), + ?assertEqual({ok, []}, Result1), + + {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc), + {ok, Subscription, _, _} = couch_views_jobs:subscribe(Db, Mrst), + couch_jobs:wait(Subscription, finished, 1000), + + Args2 = #{ + start_key => 8, + update => false + }, + + Result2 = couch_views:map_query(Db, DDoc, Idx, fun default_cb/2, + [], Args2), + ?assertEqual(Expect, Result2). + + +% should_give_ext_size_seq_indexed_test(Db) -> +% DDoc = couch_doc:from_json_obj({[ +% {<<"_id">>, <<"_design/seqdoc">>}, +% {<<"options">>, {[{<<"seq_indexed">>, true}]}}, +% {<<"views">>, {[ +% {<<"view1">>, {[ +% {<<"map">>, <<"function(doc){emit(doc._id, doc._id);}">>} +% ]}} +% ]} +% } +% ]}), +% {ok, _} = couch_db:update_doc(Db, DDoc, []), +% {ok, Db1} = couch_db:open_int(couch_db:name(Db), []), +% {ok, DDoc1} = couch_db:open_doc(Db1, <<"_design/seqdoc">>, [ejson_body]), +% couch_mrview:query_view(Db1, DDoc1, <<"view1">>, [{update, true}]), +% {ok, Info} = couch_mrview:get_info(Db1, DDoc), +% Size = couch_util:get_nested_json_value({Info}, [sizes, external]), +% ok = couch_db:close(Db1), +% ?assert(is_number(Size)). + + +run_query(Idx, Args) -> + DbName = ?tempdb(), + {ok, Db} = fabric2_db:create(DbName, [{user_ctx, ?ADMIN_USER}]), + DDoc = create_ddoc(), + Docs = make_docs(10), + fabric2_db:update_docs(Db, [DDoc | Docs]), + couch_views:map_query(Db, DDoc, Idx, fun default_cb/2, [], Args). + + +default_cb(complete, Acc) -> + {ok, lists:reverse(Acc)}; +default_cb({final, Info}, []) -> + {ok, [Info]}; +default_cb({final, _}, Acc) -> + {ok, Acc}; +default_cb(ok, ddoc_updated) -> + {ok, ddoc_updated}; +default_cb(Row, Acc) -> + {ok, [Row | Acc]}. + + +create_ddoc() -> + couch_doc:from_json_obj({[ + {<<"_id">>, <<"_design/bar">>}, + {<<"views">>, {[ + {<<"baz">>, {[ + {<<"map">>, <<"function(doc) {emit(doc.val, doc.val);}">>} + ]}}, + {<<"boom">>, {[ + {<<"map">>, << + "function(doc) {\n" + " emit([doc.val.toString(), doc.val], doc.val);\n" + "}" + >>} + ]}}, + {<<"bing">>, {[ + {<<"map">>, <<"function(doc) {}">>} + ]}}, + {<<"doc_emit">>, {[ + {<<"map">>, <<"function(doc) {emit(doc.val, doc)}">>} + ]}}, + {<<"duplicate_keys">>, {[ + {<<"map">>, << + "function(doc) {\n" + " emit(doc._id, doc.val);\n" + " emit(doc._id, doc.val + 1);\n" + "}">>} + ]}}, + {<<"zing">>, {[ + {<<"map">>, << + "function(doc) {\n" + " if(doc.foo !== undefined)\n" + " emit(doc.foo, 0);\n" + "}" + >>} + ]}} + ]}} + ]}). + + +make_docs(Count) -> + [doc(I) || I <- lists:seq(1, Count)]. + + +doc(Id) -> + couch_doc:from_json_obj({[ + {<<"_id">>, list_to_binary(integer_to_list(Id))}, + {<<"val">>, Id} + ]}). diff --git a/src/fabric/src/fabric2.hrl b/src/fabric/src/fabric2.hrl index de1d3d1..6392d12 100644 --- a/src/fabric/src/fabric2.hrl +++ b/src/fabric/src/fabric2.hrl @@ -46,6 +46,7 @@ -define(DB_DOCS, 21). -define(DB_LOCAL_DOCS, 22). -define(DB_ATTS, 23). +-define(DB_VIEWS, 24). % Versions diff --git a/src/fabric/src/fabric2_view.erl b/src/fabric/src/fabric2_view.erl new file mode 100644 index 0000000..01c9ab0 --- /dev/null +++ b/src/fabric/src/fabric2_view.erl @@ -0,0 +1,81 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_view). + +-export([ + query/7 +]). + +-include_lib("couch_mrview/include/couch_mrview.hrl"). + +%% @doc execute a given view. +%% There are many additional query args that can be passed to a view, +%% see <a href="http://wiki.apache.org/couchdb/HTTP_view_API#Querying_Options"> +%% query args</a> for details. +% -spec query(db(), [{atom(), any()}] | [], +% #doc{} | binary(), iodata(), callback(), any(), #mrargs{}) -> any(). +query(Db, Options, DDoc, ViewName, Callback, Acc0, QueryArgs0) -> + DbName = fabric2_db:name(Db), +%% View = name(ViewName), + case fabric_util:is_users_db(DbName) of + true -> + FakeDb = fabric_util:open_cluster_db(DbName, Options), + couch_users_db:after_doc_read(DDoc, FakeDb); + false -> + ok + end, +%% {ok, #mrst{views=Views, language=Lang}} = +%% couch_views_util:ddoc_to_mrst(DbName, DDoc), +%% QueryArgs1 = couch_mrview_util:set_view_type(QueryArgs0, View, Views), +%% QueryArgs1 = fabric_util:validate_args(Db, DDoc, QueryArgs0), + QueryArgs1 = couch_mrview_util:validate_args(Db, DDoc, QueryArgs0), +%% VInfo = couch_mrview_util:extract_view(Lang, QueryArgs1, View, Views), + case is_reduce_view(QueryArgs1) of + true -> + throw({not_implemented}); + false -> + MapQueryArgs = mrargs_to_map((QueryArgs1)), + couch_views:map_query(Db, DDoc, ViewName, Callback, + Acc0, MapQueryArgs) + end. + + +is_reduce_view(_) -> + false. + + +name(Thing) -> + couch_util:to_binary(Thing). + + +mrargs_to_map(#mrargs{} = Args) -> + #{ + start_key => Args#mrargs.start_key, + start_key_docid => Args#mrargs.start_key_docid, + end_key => Args#mrargs.end_key, + end_key_docid => Args#mrargs.end_key_docid, + keys => Args#mrargs.keys, + direction => Args#mrargs.direction, + limit => Args#mrargs.limit, + skip => Args#mrargs.skip, + update => Args#mrargs.update, + multi_get => Args#mrargs.multi_get, + inclusive_end => Args#mrargs.inclusive_end, + include_docs => Args#mrargs.include_docs, + doc_options => Args#mrargs.doc_options, + update_seq => Args#mrargs.update_seq, + conflicts => Args#mrargs.conflicts, + sorted => Args#mrargs.sorted + }. + + diff --git a/test/elixir/test/map_test.exs b/test/elixir/test/map_test.exs new file mode 100644 index 0000000..b7a809d --- /dev/null +++ b/test/elixir/test/map_test.exs @@ -0,0 +1,222 @@ +defmodule ViewMapTest do + use CouchTestCase + + @moduledoc """ + Test Map functionality for views + """ + def get_ids(resp) do + %{:body => %{"rows" => rows}} = resp + Enum.map(rows, fn row -> row["id"] end) + end + + defp create_map_docs(db_name) do + docs = + for i <- 1..10 do + group = + if rem(i, 3) == 0 do + "one" + else + "two" + end + + doc = %{ + :_id => "doc-id-#{i}", + :value => i, + :some => "field", + :group => group + } + end + + resp = Couch.post("/#{db_name}/_bulk_docs", body: %{:docs => docs}) + assert resp.status_code == 201 + end + + setup do + db_name = random_db_name() + {:ok, _} = create_db(db_name) + on_exit(fn -> delete_db(db_name) end) + + create_map_docs(db_name) + + map_fun1 = """ + function(doc) { + if (doc.some) { + emit(doc.value , doc.value); + } + + if (doc._id.indexOf("_design") > -1) { + emit(0, "ddoc") + } + } + """ + + map_fun2 = """ + function(doc) { + if (doc.group) { + emit([doc.some, doc.group], 1); + } + } + """ + + body = %{ + :docs => [ + %{ + _id: "_design/map", + views: %{ + some: %{map: map_fun1}, + map_some: %{map: map_fun2} + } + }, + %{ + _id: "_design/include_ddocs", + views: %{some: %{map: map_fun1}}, + options: %{include_design: true} + } + ] + } + + resp = Couch.post("/#{db_name}/_bulk_docs", body: body) + Enum.each(resp.body, &assert(&1["ok"])) + + # ddoc = %{ + # :_id => "_design/map", + # views: %{ + # some: %{map: map_fun1}, + # map_some: %{map: map_fun2} + # } + # } + # resp = Couch.put("/#{db_name}/#{ddoc._id}", body: ddoc) + # IO.inspect resp + # assert resp.status_code == 201 + + {:ok, [db_name: db_name]} + end + + def get_reduce_result(resp) do + %{:body => %{"rows" => rows}} = resp + rows + end + + test "query returns docs", context do + db_name = context[:db_name] + + url = "/#{db_name}/_design/map/_view/some" + resp = Couch.get(url) + assert resp.status_code == 200 + + ids = get_ids(resp) + + assert ids == [ + "doc-id-1", + "doc-id-2", + "doc-id-3", + "doc-id-4", + "doc-id-5", + "doc-id-6", + "doc-id-7", + "doc-id-8", + "doc-id-9", + "doc-id-10" + ] + + url = "/#{db_name}/_design/map/_view/map_some" + resp = Couch.get(url) + assert resp.status_code == 200 + + ids = get_ids(resp) + + assert ids == [ + "doc-id-3", + "doc-id-6", + "doc-id-9", + "doc-id-1", + "doc-id-10", + "doc-id-2", + "doc-id-4", + "doc-id-5", + "doc-id-7", + "doc-id-8" + ] + end + + test "updated docs rebuilds index", context do + db_name = context[:db_name] + + url = "/#{db_name}/_design/map/_view/some" + resp = Couch.get(url) + assert resp.status_code == 200 + ids = get_ids(resp) + + assert ids == [ + "doc-id-1", + "doc-id-2", + "doc-id-3", + "doc-id-4", + "doc-id-5", + "doc-id-6", + "doc-id-7", + "doc-id-8", + "doc-id-9", + "doc-id-10" + ] + + update_doc_value(db_name, "doc-id-5", 0) + update_doc_value(db_name, "doc-id-6", 100) + + resp = Couch.get("/#{db_name}/doc-id-3") + doc3 = convert(resp.body) + resp = Couch.delete("/#{db_name}/#{doc3["_id"]}", query: %{rev: doc3["_rev"]}) + assert resp.status_code == 200 + # + resp = Couch.get("/#{db_name}/doc-id-4") + doc4 = convert(resp.body) + doc4 = Map.delete(doc4, "some") + resp = Couch.put("/#{db_name}/#{doc4["_id"]}", body: doc4) + assert resp.status_code == 201 + # + resp = Couch.get("/#{db_name}/doc-id-1") + doc1 = convert(resp.body) + doc1 = Map.put(doc1, "another", "value") + resp = Couch.put("/#{db_name}/#{doc1["_id"]}", body: doc1) + assert resp.status_code == 201 + + url = "/#{db_name}/_design/map/_view/some" + resp = Couch.get(url) + assert resp.status_code == 200 + ids = get_ids(resp) + + assert ids == [ + "doc-id-5", + "doc-id-1", + "doc-id-2", + "doc-id-7", + "doc-id-8", + "doc-id-9", + "doc-id-10", + "doc-id-6" + ] + end + + test "can index design docs", context do + db_name = context[:db_name] + + url = "/#{db_name}/_design/include_ddocs/_view/some" + resp = Couch.get(url, query: %{limit: 3}) + assert resp.status_code == 200 + ids = get_ids(resp) + + assert ids == ["_design/include_ddocs", "_design/map", "doc-id-1"] + end + + def update_doc_value(db_name, id, value) do + resp = Couch.get("/#{db_name}/#{id}") + doc = convert(resp.body) + doc = Map.put(doc, "value", value) + resp = Couch.put("/#{db_name}/#{id}", body: doc) + assert resp.status_code == 201 + end + + def convert(value) do + :jiffy.decode(:jiffy.encode(value), [:return_maps]) + end +end
