This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch feature/user-partitioned-databases-davisp in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 5a3273d2b8e5aa79562a5745d290f81569122094 Author: Paul J. Davis <paul.joseph.da...@gmail.com> AuthorDate: Thu Oct 25 17:03:00 2018 -0500 Implement partitioned dbs This change introduces the ability for users to place a group of documents in a single shard range by specifying a "partition key" in the document id. A partition key is denoted by everything preceding a colon ':' in the document id. Every document id (except for design documents) in a partitioned database is required to have a partition key. Co-authored-by: Garren Smith <garren.sm...@gmail.com> Co-authored-by: Robert Newson <rnew...@apache.org> --- src/chttpd/src/chttpd_db.erl | 28 +++++++++++++++++-- src/couch/src/couch_db.erl | 10 +++++++ src/couch/src/couch_doc.erl | 6 ++++- src/couch/src/couch_partition.erl | 57 +++++++++++++++++++++++++++++++++++++++ src/couch/src/couch_server.erl | 3 +++ src/fabric/src/fabric_util.erl | 10 ++++++- src/mem3/src/mem3.erl | 8 +++++- 7 files changed, 117 insertions(+), 5 deletions(-) diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index 49d7b58..af516db 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -285,10 +285,12 @@ create_db_req(#httpd{}=Req, DbName) -> Q = chttpd:qs_value(Req, "q", config:get("cluster", "q", "8")), P = chttpd:qs_value(Req, "placement", config:get("cluster", "placement")), EngineOpt = parse_engine_opt(Req), + DbProps = parse_partitioned_opt(Req), Options = [ {n, N}, {q, Q}, - {placement, P} + {placement, P}, + {props, DbProps} ] ++ EngineOpt, DocUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)), case fabric:create_db(DbName, Options) of @@ -314,7 +316,12 @@ delete_db_req(#httpd{}=Req, DbName) -> end. do_db_req(#httpd{path_parts=[DbName|_], user_ctx=Ctx}=Req, Fun) -> - {ok, Db} = couch_db:clustered_db(DbName, Ctx), + Shard = hd(mem3:shards(DbName)), + Props = couch_util:get_value(props, Shard#shard.opts, []), + {ok, Db} = couch_db:clustered_db(DbName, [ + {usr_ctx, Ctx}, + {props, Props} + ]), Fun(Req, Db). db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) -> @@ -1453,6 +1460,23 @@ parse_engine_opt(Req) -> end end. + +parse_partitioned_opt(Req) -> + case chttpd:qs_value(Req, "partitioned") of + undefined -> + []; + "false" -> + [] + "true" -> + [ + {partitioned, true}, + {hash, [couch_partition, hash, []]} + ]; + _ -> + throw({bad_request, <<"invalid `partitioned` parameter">>}) + end. + + parse_doc_query({Key, Value}, Args) -> case {Key, Value} of {"attachments", "true"} -> diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 9b731c9..5e945e3 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -56,6 +56,7 @@ is_db/1, is_system_db/1, is_clustered/1, + is_partitioned/1, set_revs_limit/2, set_purge_infos_limit/2, @@ -214,6 +215,15 @@ is_clustered(#db{}) -> is_clustered(?OLD_DB_REC = Db) -> ?OLD_DB_MAIN_PID(Db) == undefined. +is_partitioned(#db{options = Options}) -> + {props, Props} = couch_util:get_value(props, Options), + case couch_util:get_value(partitioned, Props) of + {partitioned, true} -> + true; + _ -> + false + end. + ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) -> ok = gen_server:call(Pid, full_commit, infinity), {ok, StartTime}. diff --git a/src/couch/src/couch_doc.erl b/src/couch/src/couch_doc.erl index f960ec5..22f899f 100644 --- a/src/couch/src/couch_doc.erl +++ b/src/couch/src/couch_doc.erl @@ -16,7 +16,7 @@ -export([from_json_obj/1, from_json_obj_validate/1]). -export([from_json_obj/2, from_json_obj_validate/2]). -export([to_json_obj/2, has_stubs/1, merge_stubs/2]). --export([validate_docid/1, validate_docid/2, get_validate_doc_fun/1]). +-export([validate_docid/1, validate_docid/2, validate_docid/3, get_validate_doc_fun/1]). -export([doc_from_multi_part_stream/2, doc_from_multi_part_stream/3]). -export([doc_from_multi_part_stream/4]). -export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]). @@ -199,11 +199,15 @@ parse_revs(_) -> validate_docid(DocId, DbName) -> + validate_docid(DocId, DbName, fun(_) -> ok end). + +validate_docid(DocId, DbName, Extra) -> case DbName =:= ?l2b(config:get("mem3", "shards_db", "_dbs")) andalso lists:member(DocId, ?SYSTEM_DATABASES) of true -> ok; false -> + Extra(DocId), validate_docid(DocId) end. diff --git a/src/couch/src/couch_partition.erl b/src/couch/src/couch_partition.erl new file mode 100644 index 0000000..ea8decb --- /dev/null +++ b/src/couch/src/couch_partition.erl @@ -0,0 +1,57 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_partition). + + +-export([ + extract/1, + from_docid/1, + is_member/2, + + hash/1 +]). + + +extract(Value) when is_binary(Value) -> + case binary:split(Value, <<":">>) of + [Partition, Rest] -> + {Partition, Rest}; + _ -> + undefined + end. + + +from_docid(DocId) -> + case extract(DocId) of + undefined -> + throw({illegal_docid, <<"doc id must be of form partition:id">>}); + {Partition, _} -> + Partition + end. + + +hash(<<"_design/", _/binary>> = DocId) -> + erlang:crc32(DocId); + +hash(DocId) when is_binary(DocId) -> + erlang:crc32(from_docid(DocId)). + + +is_member(Partition, DocId) -> + case extract(DocId) of + {Partition, _} -> + true; + _ -> + false + end. + diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl index c4b7bf1..95892fc 100644 --- a/src/couch/src/couch_server.erl +++ b/src/couch/src/couch_server.erl @@ -221,6 +221,9 @@ init([]) -> % Mark pluggable storage engines as a supported feature config:enable_feature('pluggable-storage-engines'), + % Mark partitioned databases as a supported feature + config:enable_feature(partitions), + % read config and register for configuration changes % just stop if one of the config settings change. couch_server_sup diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index b595def..c519ea5 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -19,7 +19,7 @@ -export([stream_start/2, stream_start/4]). -export([log_timeout/2, remove_done_workers/2]). -export([is_users_db/1, is_replicator_db/1]). --export([make_cluster_db/1, make_cluster_db/2]). +-export([is_partitioned/1]). -export([upgrade_mrargs/1]). -compile({inline, [{doc_id_and_rev,1}]}). @@ -326,6 +326,14 @@ doc_id_and_rev(#doc{id=DocId, revs={RevNum, [RevHash|_]}}) -> {DocId, {RevNum, RevHash}}. +is_partitioned(DbName0) when is_binary(DbName0) -> + Shards = mem3:shards(fabric:dbname(DbName0)), + is_partitioned(make_cluster_db(hd(Shards))); + +is_partitioned(Db) -> + couch_db:is_partitioned(Db). + + upgrade_mrargs(#mrargs{} = Args) -> Args; diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl index ae52104..3c6a388 100644 --- a/src/mem3/src/mem3.erl +++ b/src/mem3/src/mem3.erl @@ -13,7 +13,7 @@ -module(mem3). -export([start/0, stop/0, restart/0, nodes/0, node_info/2, shards/1, shards/2, - choose_shards/2, n/1, n/2, dbname/1, ushards/1]). + choose_shards/2, n/1, n/2, dbname/1, ushards/1, ushards/2]). -export([get_shard/3, local_shards/1, shard_suffix/1, fold_shards/2]). -export([sync_security/0, sync_security/1]). -export([compare_nodelists/0, compare_shards/1]). @@ -136,6 +136,12 @@ ushards(DbName) -> Shards = ushards(DbName, live_shards(DbName, Nodes, [ordered]), ZoneMap), mem3_util:downcast(Shards). +-spec ushards(DbName::iodata(), DocId::binary()) -> [#shard{}]. +ushards(DbName, DocId) -> + Shards = shards_int(DbName, DocId, [ordered]), + Shard = hd(Shards), + mem3_util:downcast([Shard]). + ushards(DbName, Shards0, ZoneMap) -> {L,S,D} = group_by_proximity(Shards0, ZoneMap), % Prefer shards in the local zone over shards in a different zone,