This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch opentracing-davisp in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 2732c28d1c7ebff52230ad29fa26755bdd8a44d2 Author: ILYA Khlopotov <[email protected]> AuthorDate: Tue Oct 22 13:00:18 2019 +0000 Implement ctrace application --- .gitignore | 5 + rebar.config.script | 5 + src/ctrace/README.md | 330 ++++++++++++++++++++++++++ src/ctrace/src/ctrace.app.src | 27 +++ src/ctrace/src/ctrace.erl | 255 ++++++++++++++++++++ src/ctrace/src/ctrace_action.erl | 38 +++ src/ctrace/src/ctrace_app.erl | 26 ++ src/ctrace/src/ctrace_config.erl | 203 ++++++++++++++++ src/ctrace/src/ctrace_dsl.erl | 202 ++++++++++++++++ src/ctrace/src/ctrace_filter.erl | 45 ++++ src/ctrace/src/ctrace_sampler.erl | 36 +++ src/ctrace/src/ctrace_sup.erl | 43 ++++ src/ctrace/test/exunit/ctrace_config_test.exs | 153 ++++++++++++ src/ctrace/test/exunit/ctrace_dsl_test.exs | 157 ++++++++++++ src/ctrace/test/exunit/ctrace_test.exs | 88 +++++++ src/ctrace/test/exunit/test_helper.exs | 2 + 16 files changed, 1615 insertions(+) diff --git a/.gitignore b/.gitignore index 3c8bf0d..2de464c 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ .venv .DS_Store .rebar/ +.rebar3/ .erlfdb/ .eunit/ log @@ -54,16 +55,20 @@ src/hyper/ src/ibrowse/ src/ioq/ src/hqueue/ +src/jaeger_passage/ src/jiffy/ src/ken/ src/khash/ +src/local/ src/meck/ src/mochiweb/ src/oauth/ +src/passage/ src/proper/ src/rebar/ src/smoosh/ src/snappy/ +src/thrift_protocol/ src/triq/ tmp/ diff --git a/rebar.config.script b/rebar.config.script index 8ef1abc..11cd426 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -87,6 +87,7 @@ SubDirs = [ "src/couch_peruser", "src/couch_tests", "src/couch_views", + "src/ctrace", "src/ddoc_cache", "src/dreyfus", "src/fabric", @@ -122,6 +123,10 @@ DepDescs = [ {jiffy, "jiffy", {tag, "CouchDB-0.14.11-2"}}, {mochiweb, "mochiweb", {tag, "v2.19.0"}}, {meck, "meck", {tag, "0.8.8"}}, +{passage, {url, "https://github.com/sile/passage.git"}, + {tag, "0.2.6"}}, +{jaeger_passage, {url, "https://github.com/sile/jaeger_passage.git"}, + {tag, "0.1.12"}}, %% TMP - Until this is moved to a proper Apache repo {erlfdb, "erlfdb", {branch, "master"}} diff --git a/src/ctrace/README.md b/src/ctrace/README.md new file mode 100644 index 0000000..3c7e580 --- /dev/null +++ b/src/ctrace/README.md @@ -0,0 +1,330 @@ +Notes: + +* Filtering on the entire tree +* Filtering based on time +* Only report upstream spans when some child span condition is met +* Stuff +* Problems with filters + - Child spans have already been ignored by the time we get to + the end of a parent span + - Don't have tags or time at sample time + - Rules have to account for every level in the span tree unless + its a `(#{}) -> [report]` rule which is not super powerful + - Triggering reports based on child spans will lose information + on previous child spans since they've already been ignored + before a filter condition returns true + +* Filtering spans vs not is awkward since it applies to the + top level span or something? Do they just pick the point where + we start reporting spans? + + + +Overview +======== + +This application provides an interface to opentracing compatible +tracing systems. + +Open Tracing +------------ + +[//]: # (taken from https://github.com/opentracing/specification/blob/master/specification.md) +Traces in OpenTracing are defined implicitly by their Spans. +In particular, a Trace can be thought of as a directed acyclic +graph (DAG) of Spans, where the edges between Spans are called +References. + +Each Span encapsulates the following state: + +- An operation name +- A start timestamp +- A finish timestamp +- A set of zero or more key:value Span Tags. +- A set of zero or more Span Logs, each of which is + itself a key:value map paired with a timestamp. +- A SpanContext +- References to zero or more causally-related Spans + +Every trace is identified by unique trace_id. +Every trace includes zero or more tracing spans. +Which are identified by span id. + +Jaeger +------ + +Jaeger is a distributed tracing system released as open source by Uber Technologies. +It is one of implementations of open tracing specification. +Jaeger supports Trace detail view where a single trace is represented as +a tree of tracing span with detailed timing information about every span. +In order to make this feature work all tracing spans should form a lineage +from the same root span. + + + +Implementation +============== + +Every operation has unique identifier. Example identifiers are: + +- all-dbs.read +- database.delete +- replication.trigger +- view.compaction + +The tracing begins with sampling. We don't do anything if +sampler is not configured for a given operation. + +For every operation we start a separate tracing filter process. +This process receieve spans when `ctrace:finish_span` is called. +The filtering processes are named after operation name. + +Every filtering process has a number of filtering rules. +The first rule matching the conditions of a given span would be selected. +Every rule has a list of actions to execute on given span. + +Currently we only support `report`. In the future we might support following: + +- report +- count +- sample = probability (float) + +When the rule is selected and it has `report` action we forward the span to +`reporter` process. Reporter process encodes the span and sends it to jaeger. + +Span pipeline +------------- + ++-------+ +------+ +--------+ +------+ +|sampler| ----> |filter| ----> |reporter| ----> |jaeger| ++-------+ +------+ +--------+ +------+ + +- sampler - adds filtering rules into span and does prefiltering +- filter - uses filtering rules to decide if it needs to forward span +- reporter - sends span to jaeger + +Code instrumentation +-------------------- + +The span lifecycle is controled by + +- `ctrace:start_span` +- `ctrace:finish_span` + +The instrumentation can add tags and logs to a span. In some cases we +embed span in other structures. Therefore to avoid confussion we don't +use term `span` and use `subject` instead. Currently we support `#httpd{}` +record and `db` as subjects. + +Example of instrumentation: +``` +HttpReq2 = ctrace:trace(HttpReq1, fun(S0) -> + S1 = ctrace:tag(S0, #{ + peer => Peer, + 'http.method' => Method, + nonce => Nonce, + 'http.url' => Path, + 'span.kind' => <<"server">>, + component => <<"couchdb.chttpd">> + }), + ctrace:log(S1, #{ + field0 => "value0" + }) +end``1), +``` + +As you can see the `ctrace:trace/2` function receives a function which +operates on the span. The functions that can be on span are: + +- `ctrace:tag/2` to add new tags to the span +- `ctrace:set_operation_name/2` sometimes operation name is + not available when span is started. This function let + us set the name latter. +- `ctrace:log/2` add log event to the span + +There are some informative functions as well: + +- `ctrace:refs/1` - returns all other spans we have references from the current +- `ctrace:operation_name/1` - returns operation name for the current span +- `ctrace:trace_id/1` - returns trace id for the current span +- `ctrace:span_id/1` - returns span id for the current span + +Instrumentation guide +--------------------- + +- Start root span at system boundaries + - httpd + - internal trigger (replication or compaction jobs) +- Start new child span when you cross layer boundaries +- Start new child span when you cross node bounadary +- Extend `<app>_httpd_handlers:handler_info/1` as needed to + have operation ids. (We as community might need to work on + naming conventions) +- Use `ctrace:new_request_ctx` to pass additional information + about request. +- Update layers to pass `request_ctx` as needed (not done for jobs). +- Use [span conventions](https://github.com/apache/couchdb-documentation/blob/master/rfcs/011-opentracing.md#conventions) https://github.com/opentracing/specification/blob/master/semantic_conventions.md +- When in doubt consult open tracing spec + - [spec overview](https://github.com/opentracing/specification/blob/master/specification.md) + - [conventions](https://github.com/opentracing/specification/blob/master/semantic_conventions.md#standard-span-tags-and-log-fields) + +Configuration +------------- + +The tracers are configured using standard CouchDB ini files +based configuration. There is a global toggle +`[tracing]->'enabled' = false` which enables the tracing. +The samplers are configured in a `[tracing.samplers]` section, which +specifies the sampler to use for given tracer. If sampler is not +configured the spans for a given operation are droped. Every sampler +must have a corespondent filter section. The naming convention is: +`[tracing.OperationId]`. For security reasons `[tracing.OperationId]` +is not available via HTTP endpoint. Administrator can toggle tracing +with predefined rules for specific operation by setting a correspondent +sampler to either `none` or `all`. +The keys in filter section are irrelevant and used only for ordering +purpose. The rules are processed in the alphabetical order. We use a +DSL for defining rules. The DSL has following structure: +``` +( #{<[arguments]>} ) when <[conditions]> -> <[actions]> +``` + +Where: + - arguments is comma separated pairs of + `<tag_or_field_name> := <variable_name>` + - actions is a list which contains + - `report` + - conditions + - `<[condition]>` + - `| <[condition]> <[operator]> <[condition]>` + - condition: + - `<variable_name> <[operator]> <value>` + `| <[guard_function]>(<[variable_name]>)` + - `variable_name` - lowercase name without special characters + - guarg_function: one of + - `is_atom` + - `is_float` + - `is_integer` + - `is_list` + - `is_number` + - `is_pid` + - `is_port` + - `is_reference` + - `is_tuple` + - `is_map` + - `is_binary` + - `is_function` + - `element` - `element(n, tuple)` + - `abs` + - `hd` - return head of the list + - `length` + - `map_get` + - `map_size` + - `round` + - `node` + - `size` - returns size of the tuple + - `bit_size` - returns number of bits in binary + - `byte_size` - returns number of bytes in binary + - `tl` - return tail of a list + - `trunc` + - `self` + - operator: one of + - `not` + - `and` - evaluates both expressions + - `andalso` - evaluates second only when first is true + - `or` - evaluates both expressions + - `orelse` - evaluates second only when first is false + - `xor` + - `+` + - `-` + - `*` + - `div` + - `rem` + - `band` - bitwise AND + - `bor` - bitwise OR + - `bxor` - bitwise XOR + - `bnot` - bitwise NOT + - `bsl` - arithmetic bitshift left + - `bsr` - bitshift right + - `>` + - `>=` + - `<` + - `=<` + - `=:=` + - `==` + - `=/=` + - `/=` - not equal + +### Open tracing agent configuration + +``` +[tracing] + +thrift_format = compact ; compact | binary +agent_host = 127.0.0.1 +agent_port = 6831 +; app_name is the value whicj would be used for +; `location.application` tag +app_name = couchdb +``` + + +Bellow is an example of a configuration: + +```ini +[tracing] +enabled = true +thrift_format = compact ; compact | binary +agent_host = jaeger.local +agent_port = 6831 +; app_name is the value whicj would be used for +; `location.application` tag +app_name = couchdb + +[tracing.samplers] + +view.build = all +database-info.read = all + +[tracing.view.build] + +a_select = (#{'view.name' := Name}) when Name == "blablabla" -> [report] +details = (#{parent := Parent}) when Parent == <<"view.build">> -> [report] + +[tracing.database-info.read] + +select = (#{'http.method' := Method}) when Method == 'GET' -> [report] +details = (#{parent := Parent}) when Parent == <<"database-info.read">> -> [report] +``` + +Note: It is important to add `details = (#{parent := Parent}) when Parent == <<"database-info.read">> -> [report]` +rule if you wanted to report children spans. + + +Developing +========== + +Here we provide a list frequently used commands +useful while working on this application. + + +1. Run all tests +``` +make setup-eunit +make && ERL_LIBS=`pwd`/src BUILDDIR=`pwd` mix test --trace src/chttpd/test/exunit/ src/ctrace/test/exunit/ +``` + +2. Run tests selectively +``` +make && ERL_LIBS=`pwd`/src BUILDDIR=`pwd` mix test --trace src/chttpd/test/exunit/ctrace_context_test.exs:59 +``` + +3. Re-run only failed tests +``` +make && ERL_LIBS=`pwd`/src BUILDDIR=`pwd` mix test --failed --trace src/chttpd/test/exunit/ src/ctrace/test/exunit/ +``` + +4. Running jaeger in docker +``` +docker run -d --net fdb-core --name jaeger.local -p 6831:6831/udp -p 16686:16686 jaegertracing/all-in-one:1.14 +``` \ No newline at end of file diff --git a/src/ctrace/src/ctrace.app.src b/src/ctrace/src/ctrace.app.src new file mode 100644 index 0000000..64f4fc5 --- /dev/null +++ b/src/ctrace/src/ctrace.app.src @@ -0,0 +1,27 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + {application, ctrace, [ + {description, "Open tracer API for CouchDB"}, + {vsn, git}, + {registered, [ + ]}, + {applications, [ + kernel, + stdlib, + syntax_tools, + config, + jaeger_passage, + passage + ]}, + {mod, {ctrace_app, []}} +]}. diff --git a/src/ctrace/src/ctrace.erl b/src/ctrace/src/ctrace.erl new file mode 100644 index 0000000..1b7bd88 --- /dev/null +++ b/src/ctrace/src/ctrace.erl @@ -0,0 +1,255 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ctrace). + +-vsn(1). + + +-export([ + is_enabled/0, + + start_span/1, + start_span/2, + finish_span/0, + finish_span/1, + with_span/2, + + set_operation_name/1, + add_tags/1, + log/1, + log/2, + + fun_to_op/1, + + get_operation_name/0, + get_tags/0, + get_refs/0, + get_trace_id/0, + get_span_id/0, + get_tracer/0, + get_context/0 +]). + + +-include_lib("passage/include/opentracing.hrl"). + +-define(ENABLED_KEY, '$ctrace_enabled$'). + + +-type tags() :: #{atom() => term()}. +-type log_fields() :: #{atom() => term()}. + + +-spec is_enabled() -> boolean(). +is_enabled() -> + case get(?ENABLED_KEY) of + true -> true; + false -> false; + undefined -> + Result = ctrace_config:is_enabled(), + put(?ENABLED_KEY, Result), + Result + end. + + +-spec start_span(OperationName :: atom()) -> ok. + +start_span(undefined) -> + start_span(ctrace_config:default_tracer(), []); + +start_span(OperationName) -> + start_span(OperationName, []). + + +-spec start_span(OperationName :: atom(), Options :: [term()]) -> ok. + +start_span(OperationName, Options0) -> + case is_enabled() of + true -> + CurrSpan = passage_pd:current_span(), + Options1 = case lists:keymember(tracer, 1, Options0) of + true -> Options0; + false -> [{tracer, jaeger_passage_reporter} | Options0] + end, + passage_pd:start_span(OperationName, Options1), + if CurrSpan == undefined -> ok; true -> + ParentOp = passage_span:get_operation_name(CurrSpan), + passage_pd:set_tags(#{parent => ParentOp}) + end; + false -> + ok + end. + + +-spec finish_span() -> ok. + +finish_span() -> + finish_span([]). + + +-spec finish_span(Options :: [term()]) -> ok. + +finish_span(Options) -> + case is_enabled() of + true -> + passage_pd:finish_span(Options); + _ -> + ok + end. + + +%-spec with_span(OperationName :: atom(), Fun :: fun() -> term()) -> term(). +with_span(OperationName, Fun) -> + case is_enabled() of + true -> + try + start_span(OperationName, []), + Fun() + catch Type:Reason -> + Stack = erlang:get_stacktrace(), + log(#{ + ?LOG_FIELD_ERROR_KIND => Type, + ?LOG_FIELD_MESSAGE => Reason, + ?LOG_FIELD_STACK => Stack + }, [error]), + erlang:raise(Type, Reason, Stack) + after + finish_span() + end; + false -> + Fun() + end. + + +-spec set_operation_name(OperationName :: atom()) -> ok. + +set_operation_name(OperationName) -> + case is_enabled() of + true -> + passage_pd:set_operation_name(OperationName); + _ -> + ok + end. + + +-spec add_tags(Tags :: tags()) -> ok. + +add_tags(Tags) -> + case is_enabled() of + true -> + passage_pd:set_tags(Tags); + _ -> + ok + end. + + +-spec log(Fields :: log_fields()) -> ok. + +log(FieldsOrFun) -> + log(FieldsOrFun, []). + + +-spec log(Fields :: log_fields(), Options :: [term()]) -> ok. + +log(FieldsOrFun, Options) -> + case is_enabled() of + true -> + passage_pd:log(FieldsOrFun, Options); + false -> + ok + end. + + +fun_to_op(Fun) -> + {module, M} = erlang:fun_info(Fun, module), + {name, F} = erlang:fun_info(Fun, name), + {arity, A} = erlang:fun_info(Fun, arity), + Str = io_lib:format("~s:~s/~b", [M, F, A]), + list_to_atom(lists:flatten(Str)). + + +-spec get_tags() -> tags() | undefined. + +get_tags() -> + case is_enabled() of + true -> + passage_span:get_tags(passage_pd:current_span()); + false -> + undefined + end. + + +-spec get_refs() -> passage:refs() | undefined. + +get_refs() -> + case is_enabled() of + true -> + passage_span:get_refs(passage_pd:current_span()); + false -> + undefined + end. + + +-spec get_operation_name() -> atom(). + +get_operation_name() -> + case is_enabled() of + true -> + passage_span:get_operation_name(passage_pd:current_span()); + false -> + undefined + end. + + +-spec get_trace_id() -> passage:trace_id() | undefined. + +get_trace_id() -> + case is_enabled() of + true -> + jaeger_passage_span_context:get_trace_id(get_context()); + false -> + undefined + end. + + +-spec get_span_id() -> passage:span_id() | undefined. +get_span_id() -> + case is_enabled() of + true -> + jaeger_passage_span_context:get_span_id(get_context()); + false -> + undefined + end. + + +-spec get_tracer() -> passage:tracer_id(). + +get_tracer() -> + case is_enabled() of + true -> + passage_span:get_tracer(passage_pd:current_span()); + false -> + undefined + end. + + +-spec get_context() -> passage_span_contest:context(). + +get_context() -> + case is_enabled() of + true -> + Span = passage_pd:current_span(), + passage_span:get_context(Span); + false -> + undefined + end. diff --git a/src/ctrace/src/ctrace_action.erl b/src/ctrace/src/ctrace_action.erl new file mode 100644 index 0000000..28276b1 --- /dev/null +++ b/src/ctrace/src/ctrace_action.erl @@ -0,0 +1,38 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ctrace_action). + +-export([ + sample/1, + report/1 +]). + +-type action_fun() + :: fun((Span :: passage_span:span()) -> boolean()). + +-spec sample( + [SamplingRate :: float() | integer()] + ) -> action_fun(). + +sample([SamplingRate]) -> + fun(_Span) -> rand:uniform() < SamplingRate end. + +-spec report( + [ReporterId :: passage:tracer_id()] + ) -> action_fun(). + +report([TracerId]) -> + fun(Span) -> + jaeger_passage_reporter:report(TracerId, Span), + true + end. \ No newline at end of file diff --git a/src/ctrace/src/ctrace_app.erl b/src/ctrace/src/ctrace_app.erl new file mode 100644 index 0000000..c98b897 --- /dev/null +++ b/src/ctrace/src/ctrace_app.erl @@ -0,0 +1,26 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ctrace_app). + +-behaviour(application). + +-export([ + start/2, + stop/1 +]). + +start(_StartType, _StartArgs) -> + ctrace_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/ctrace/src/ctrace_config.erl b/src/ctrace/src/ctrace_config.erl new file mode 100644 index 0000000..b8b45c8 --- /dev/null +++ b/src/ctrace/src/ctrace_config.erl @@ -0,0 +1,203 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ctrace_config). +-behaviour(config_listener). +-vsn(1). + +-export([ + default_tracer/0, + is_enabled/0, + update/0 +]). + +-export([ + handle_config_change/5, + handle_config_terminate/3 +]). + + +-define(MAIN_TRACER, jaeger_passage_reporter). + + +-spec default_tracer() -> atom(). + +default_tracer() -> + ?MAIN_TRACER. + + +-spec is_enabled() -> boolean(). + +is_enabled() -> + config:get_boolean("tracing", "enabled", false). + + +-spec update() -> ok. + +update() -> + case is_enabled() of + true -> + maybe_start_main_tracer(?MAIN_TRACER) andalso update_config(); + false -> + jaeger_passage:stop_tracer(?MAIN_TRACER) + end, + ok. + + +handle_config_change("tracing.samplers", OperationIdStr, Value, _, St) -> + case is_enabled() of + true -> update_sampler(OperationIdStr, Value); + false -> ok + end, + {ok, St}; +handle_config_change("tracing." ++ OperationIdStr, _Key, _Val, _Persist, St) -> + case is_enabled() of + true -> update_sampler(OperationIdStr); + false -> ok + end, + {ok, St}; +handle_config_change("tracing", "enabled", _, _Persist, St) -> + update(), + {ok, St}; +handle_config_change(_Sec, _Key, _Val, _Persist, St) -> + {ok, St}. + + +handle_config_terminate(_Server, _Reason, _State) -> + update(). + + +maybe_start_main_tracer(TracerId) -> + case passage_tracer_registry:get_reporter(TracerId) of + error -> + start_main_tracer(TracerId); + _ -> + true + end. + + +start_main_tracer(TracerId) -> + Format = list_to_atom(config:get("tracing", "thrift_format", "compact")), + Host = config:get("tracing", "agent_host", "127.0.0.1"), + Port = config:get_integer("tracing", "agent_port", 6831), + Name = list_to_atom(config:get("tracing", "app_name", "couchdb")), + + Sampler = passage_sampler_all:new(), + Options = [ + {thrift_format, Format}, + {agent_host, Host}, + {agent_port, Port}, + {default_service_name, Name} + ], + + case jaeger_passage:start_tracer(TracerId, Sampler, Options) of + ok -> + true; + {error, Reason} -> + couch_log:error("Cannot start main tracer: ~p~n", [Reason]), + false + end. + + +update_config() -> + lists:foreach(fun({OperationIdStr, SamplerDef}) -> + update_sampler(OperationIdStr, SamplerDef) + end, config:get("tracing.samplers")). + + +update_sampler(OperationIdStr) when is_list(OperationIdStr) -> + case config:get("tracing.samplers", OperationIdStr) of + undefined -> + rem_tracer(OperationIdStr); + SamplerDef -> + update_sampler(OperationIdStr, SamplerDef) + end. + + +update_sampler(OperationIdStr, deleted) -> + rem_tracer(OperationIdStr); + +update_sampler(OperationIdStr, SamplerDef) -> + case parse_sampler(SamplerDef) of + undefined -> + rem_tracer(OperationIdStr); + Sampler -> + compile_rules(OperationIdStr), + add_tracer(OperationIdStr, Sampler) + end. + + +add_tracer(OperationIdStr, Sampler) -> + OperationId = list_to_atom(OperationIdStr), + case passage_tracer_registry:get_reporter(OperationId) of + {ok, _} -> + % Only need to update the sampler here as the + % ctrace_filter will automatically update to use + % the recompiled dynamic module. + passage_tracer_registry:set_sampler(OperationId, Sampler); + error -> + Mod = filter_module_name(OperationIdStr), + CTraceFilter = ctrace_filter:new(OperationId, Mod), + Filter = passage_reporter:new(ctrace_filter, CTraceFilter), + Ctx = jaeger_passage_span_context, + passage_tracer_registry:register(OperationId, Ctx, Sampler, Filter) + end. + + +rem_tracer(OperationIdStr) -> + OperationId = list_to_atom(OperationIdStr), + passage_tracer_registry:deregister(OperationId). + + +compile_rules(OperationIdStr) -> + OperationId = list_to_atom(OperationIdStr), + FilterMod = filter_module_name(OperationIdStr), + RulesRaw = config:get("tracing." ++ OperationIdStr), + try + Rules = lists:map(fun({Name, RuleDef}) -> + Rule = ctrace_dsl:parse_rule(Name, RuleDef), + RawActions = maps:get(actions, Rule), + Actions = lists:map(fun set_action/1, RawActions), + maps:put(actions, Actions, Rule) + end, RulesRaw), + ctrace_dsl:compile(FilterMod, Rules) + catch throw:{error, Reason} -> + rem_tracer(OperationIdStr), + couch_log:error("cannot compile '~s': ~p~n", [OperationId, Reason]) + end. + + +parse_sampler(Binary) when is_binary(Binary) -> + parse_sampler(binary_to_list(Binary)); +parse_sampler("all") -> + ctrace_sampler:all(); +parse_sampler("null") -> + ctrace_sampler:null(); +parse_sampler(FloatStr) -> + Help = "Cannot parse sampler. The only supported formats are: " + " all | null | float(), got '~s'", + try + ctrace_sampler:probalistic(binary_to_float(FloatStr)) + catch _:_ -> + couch_log:error(Help, [FloatStr]), + undefined + end. + + +set_action({sample, Rate}) -> + {ctrace_action, sample, [Rate]}; +set_action(report) -> + {ctrace_action, report, [?MAIN_TRACER]}. + + +filter_module_name(OperationIdStr) -> + list_to_atom("ctrace_filter_" ++ OperationIdStr). diff --git a/src/ctrace/src/ctrace_dsl.erl b/src/ctrace/src/ctrace_dsl.erl new file mode 100644 index 0000000..efe9545 --- /dev/null +++ b/src/ctrace/src/ctrace_dsl.erl @@ -0,0 +1,202 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ctrace_dsl). +-include_lib("syntax_tools/include/merl.hrl"). + +-export([ + parse_rule/2, + generate/2, + compile/2, + source/1, + print_source/1 +]). + +-type ast() + :: erl_syntax:syntaxTree(). + +-type bindings() + :: map(). + +-type action() + :: report + | {sample, integer} + | {sample, float}. + +-type rule() + :: #{ + name := atom(), + args := ast(), + conditions := ast(), + bindings := bindings(), + actions := [action()], + source := string() + }. + +-spec parse_rule( + FunName :: string(), + String :: string() + ) -> rule(). + +parse_rule(FunName, String) -> + AST = merl:quote(FunName ++ String ++ "."), + case AST of + ?Q("'@Name'(_@Args) when _@__@Guard -> [_@@Actions].") + when erl_syntax:type(Args) == map_expr -> + #{ + name => erl_syntax:atom_value(Name), + args => Args, + conditions => Guard, + bindings => bindings(Args), + actions => parse_actions(Actions), + source => String + }; + ?Q("'@Name'(_@Args) when _@__@Guard -> _@@_.") + when erl_syntax:type(Args) == map_expr -> + fail("Function body should be a list of actions"); + ?Q("'@Name'(_@Args) when _@__@Guard -> _@@_.") -> + fail("The only argument of the filter should be map"); + ?Q("'@Name'(_@@Args) when _@__@Guard -> _@@_.") -> + fail("The arrity of the filter function should be 1"); + _ -> + fail("Unknown shape of a filter function") + end. + +-spec bindings( + MapAST :: ast() + ) -> bindings(). + +bindings(MapAST) -> + %% Unfortunatelly merl doesn't seem to support maps + %% so we had to do it manually + lists:foldl(fun(AST, Bindings) -> + erl_syntax:type(AST) == map_field_exact + orelse fail("only #{field := Var} syntax is supported in the header"), + NameAST = erl_syntax:map_field_exact_name(AST), + erl_syntax:type(NameAST) == atom + orelse fail("only atoms are supported as field names in the header"), + Name = erl_syntax:atom_value(NameAST), + VarAST = erl_syntax:map_field_exact_value(AST), + erl_syntax:type(VarAST) == variable + orelse fail("only Capitalized names are supported as matching variables in the header"), + Var = erl_syntax:variable_name(VarAST), + maps:is_key(Var, Bindings) + andalso fail(io_libs:format("'~s' variable is already in use", [Var])), + Bindings#{Var => Name} + end, #{}, erl_syntax:map_expr_fields(MapAST)). + +-spec parse_actions( + Actions :: [ast()] + ) -> [action()]. + +parse_actions(Actions) -> + lists:map(fun(ActionAST) -> + parse_action(ActionAST) + end, Actions). + +-spec parse_action( + Actions :: ast() + ) -> action(). + +parse_action(ActionAST) -> + case ActionAST of + ?Q("report") -> + report; + ?Q("sample(_@AST)") when erl_syntax:type(AST) == integer -> + {sample, erl_syntax:integer_value(AST)}; + ?Q("sample(_@AST)") when erl_syntax:type(AST) == float -> + {sample, erl_syntax:float_value(AST)}; + ?Q("sample(_@AST)") -> + fail("expecting `integer | float` in `sample` action"); + _ -> + fail(lists:flatten(io_lib:format( + "unsuported action '~s'", [erl_prettypr:format(ActionAST)]))) + end. + +-spec generate( + ModuleName :: module(), + Rules :: [rule()] + ) -> [ast()]. + +generate(ModuleName, Rules) -> + Module = ?Q("-module('@ModuleName@')."), + Export = ?Q("-export([match/1])."), + Ordered = order_rules(Rules), + Functions = [ + erl_syntax:function(merl:term(match), [ + function_clause(Rule) + || Rule <- Ordered] ++ [?Q("(_) -> false")]) + ], + lists:flatten([Module, Export, Functions]). + +-spec order_rules( + [rule()] + ) -> [rule()]. + +order_rules(Rules) -> + lists:sort(fun(RuleA, RuleB) -> + maps:get(name, RuleA) < maps:get(name, RuleB) + end, Rules). + +-spec source( + Forms :: ast() + ) -> string(). + +source(Forms) -> + erl_prettypr:format( + erl_syntax:form_list(Forms), + [{paper,160},{ribbon,80}]). + +-spec print_source( + ast() + ) -> ok. + +print_source(Forms) -> + io:format(source(Forms) ++ "~n", []). + +-spec function_clause( + rule() + ) -> ast(). + +function_clause(Rule) -> + #{ + args := Args, + conditions := Guard, + actions := Actions + } = Rule, + ActionsAST = actions(Actions), + ?Q("(_@Args) when _@__@Guard -> _@ActionsAST"). + +-spec actions( + [mfa()] + ) -> ast(). + +actions(Actions) -> + erl_syntax:list(lists:map(fun({Module, Function, Args}) -> + %% keep in mind that implementation function would + %% receive a list of arguments + %% i.e. you would need to implement it as + %% function_name([Arg1, Arg2]) -> fun(Span) -> true end). + ?Q("'@Module@':'@Function@'(_@@Args@)") + end, Actions)). + +-spec compile( + Module :: module(), + Rules :: [mfa()] + ) -> term(). + +compile(Module, Rules) -> + AST = generate(Module, Rules), + merl:compile_and_load(AST, [verbose]). + +fail(Msg) -> + throw({error, Msg}). diff --git a/src/ctrace/src/ctrace_filter.erl b/src/ctrace/src/ctrace_filter.erl new file mode 100644 index 0000000..0feefcf --- /dev/null +++ b/src/ctrace/src/ctrace_filter.erl @@ -0,0 +1,45 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ctrace_filter). +-include_lib("passage/include/opentracing.hrl"). + +-behaviour(passage_reporter). + +-export([ + new/2, + report/2, + module/1 +]). + + +new(OperationId, Module) -> + {OperationId, Module}. + + +report({_OperationId, Module}, PSpan) -> + Tags = passage_span:get_tags(PSpan), + try + case Module:match(Tags) of + false -> + ok; + Actions -> + lists:takewhile(fun(Action) -> Action(PSpan) end, Actions), + ok + end + catch error:undef -> + ok + end. + + +module({_OperationId, Module}) -> + Module. diff --git a/src/ctrace/src/ctrace_sampler.erl b/src/ctrace/src/ctrace_sampler.erl new file mode 100644 index 0000000..7262557 --- /dev/null +++ b/src/ctrace/src/ctrace_sampler.erl @@ -0,0 +1,36 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ctrace_sampler). + +-export([ + all/0, + null/0, + probalistic/1 +]). + +-spec all() -> passage_sampler:sampler(). + +all() -> + passage_sampler_all:new(). + +-spec null() -> passage_sampler:sampler(). + +null() -> + passage_sampler_null:new(). + +-spec probalistic( + Rate :: float() + ) -> passage_sampler:sampler(). + +probalistic(SamplingRate) -> + passage_sampler_probalistic:new(SamplingRate). diff --git a/src/ctrace/src/ctrace_sup.erl b/src/ctrace/src/ctrace_sup.erl new file mode 100644 index 0000000..0a1e50b --- /dev/null +++ b/src/ctrace/src/ctrace_sup.erl @@ -0,0 +1,43 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(ctrace_sup). +-behaviour(supervisor). +-vsn(1). + +-export([ + start_link/0, + init/1 +]). + + +start_link() -> + ctrace_config:update(), + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + + +init([]) -> + Flags = #{ + strategy => one_for_one, + intensity => 5, + period => 10 + }, + Children = [ + #{ + id => config_listener_mon, + type => worker, + restart => permanent, + shutdown => 5000, + start => {config_listener_mon, start_link, [ctrace_config, nil]} + } + ], + {ok, {Flags, Children}}. diff --git a/src/ctrace/test/exunit/ctrace_config_test.exs b/src/ctrace/test/exunit/ctrace_config_test.exs new file mode 100644 index 0000000..408497a --- /dev/null +++ b/src/ctrace/test/exunit/ctrace_config_test.exs @@ -0,0 +1,153 @@ +defmodule Couch.CTrace.Config.Test do + require Logger + use ExUnit.Case + @moduletag capture_log: true + + setup do + apps = :test_util.start_applications([:ctrace]) + :meck.new(:couch_log, [{:stub_all, :meck.val(:ok)}]) + + on_exit(fn -> + :test_util.stop_applications(apps) + :meck.unload() + end) + + :config.set('tracing.samplers', 'all-docs', 'all', false) + + :config.set( + 'tracing.all-docs', + 'all', + ~C"(#{method := M}) when M == get -> []", + false + ) + + :config.set_boolean('tracing', 'enabled', true, false) + + {:ok, reporter} = + wait_non_error(fn -> + :passage_tracer_registry.get_reporter(:"all-docs") + end) + + filter = :passage_reporter.get_state(reporter) + %{filter: :ctrace_filter.module(filter)} + end + + describe "Supervision tree :" do + test "main jaeger reporter is started" do + assert match?( + {:ok, _}, + :passage_tracer_registry.get_reporter(:jaeger_passage_reporter) + ) + end + + test "pre-configured reporter is started" do + assert match?( + {:ok, _}, + :passage_tracer_registry.get_reporter(:"all-docs") + ) + end + + test "reporter is started on config change" do + :config.set('tracing.samplers', 'bulk', 'all', false) + :config.set('tracing.bulk', 'all', ~C"(#{}) -> [report]", false) + + assert wait_non_error(fn -> + :passage_tracer_registry.get_reporter(:bulk) + end) + end + + test "reporter is stoped when deleted" do + assert wait_non_error(fn -> + :passage_tracer_registry.get_reporter(:"all-docs") + end) != :timeout + + :config.delete('tracing.samplers', 'all-docs', false) + + assert wait_error(fn -> + :passage_tracer_registry.get_reporter(:"all-docs") + end) != :timeout + end + end + + describe "Configuration :" do + test "recompile rules on config update", %{filter: module} do + assert match?([], module.match(%{method: :get})) + assert match?(false, module.match(%{method: :post})) + + :config.set( + 'tracing.all-docs', + 'all', + ~C"(#{method := M}) when M == post -> []", + false + ) + + assert match?( + false, + :test_util.wait_other_value( + fn -> + module.match(%{method: :get}) + end, + [] + ) + ) + + assert match?([], module.match(%{method: :post})) + end + + test "log errors", %{filter: _} do + :config.set('tracing.all-docs', 'all', ~C"( -> syntax_error", false) + :ctrace_config.update() + + [error | _] = + :test_util.wait_other_value( + fn -> + capture_logs(:error, ~r"cannot compile '") + end, + [] + ) + + assert [:"all-docs", '1: syntax error before: \'->\''] == error + end + end + + describe "Matching :" do + test "should match", %{filter: module} do + assert match?([], module.match(%{method: :get})) + end + + test "should not match", %{filter: module} do + assert match?(false, module.match(%{method: :post})) + end + end + + defp wait_error(fun) do + :test_util.wait_value(fun, :error) + end + + defp wait_non_error(fun) do + :test_util.wait_other_value(fun, :error) + end + + defp capture_logs(level, regexp) do + history(:couch_log, level, 2) + |> Enum.flat_map(fn event -> + {_, _, [msg, args]} = elem(event, 1) + + if Regex.match?(regexp, List.to_string(msg)) do + [args] + else + [] + end + end) + end + + defp history(module, function, arity) do + history = :meck.history(module) + + history + |> Enum.filter(fn event -> + {_, fun, args} = elem(event, 1) + function == fun and arity == length(args) + end) + end +end diff --git a/src/ctrace/test/exunit/ctrace_dsl_test.exs b/src/ctrace/test/exunit/ctrace_dsl_test.exs new file mode 100644 index 0000000..f5e2947 --- /dev/null +++ b/src/ctrace/test/exunit/ctrace_dsl_test.exs @@ -0,0 +1,157 @@ +defmodule Couch.CTrace.DSL.Test do + require Logger + use ExUnit.Case, async: true + @filter_module List.to_atom(Atom.to_charlist(__MODULE__) ++ '_filter') + @moduletag capture_log: true + + describe "DSL roundtrip :" do + test "Simple Parse and Compile" do + rule_str = ~C""" + (#{'http.method' := Method}) when Method == get -> [sample(1.0)] + """ + + rule = :ctrace_dsl.parse_rule('get', rule_str) + rule = set_actions(rule) + ast = generate([rule]) + :merl.compile_and_load(ast, [:verbose]) + end + end + + describe "DSL compiler :" do + test "match clauses are in alphabetical order" do + rule_str_a = ~C"(#{foo := A}) when A == 1 -> [sample(1)]" + rule_str_b = ~C"(#{foo := B}) when B == 2 -> [sample(2)]" + rule_a = :ctrace_dsl.parse_rule('a', rule_str_a) + rule_b = :ctrace_dsl.parse_rule('b', rule_str_b) + set_action = fn {:sample, rate} -> {__MODULE__, :as_is, [rate]} end + rule_a = set_actions(rule_a, set_action) + rule_b = set_actions(rule_b, set_action) + + ast = generate([rule_b, rule_a]) + :merl.compile_and_load(ast, [:verbose]) + assert match?({[[1]], [[2]]}, {match(%{foo: 1}), match(%{foo: 2})}) + :code.delete(@filter_module) + :code.purge(@filter_module) + + ast = generate([rule_a, rule_b]) + :merl.compile_and_load(ast, [:verbose]) + assert match?({[[1]], [[2]]}, {match(%{foo: 1}), match(%{foo: 2})}) + end + end + + describe "DSL parsing :" do + test "empty map" do + rule_str = ~C"(#{}) -> [report]" + assert match?(%{}, parse(rule_str)) + end + + test "empty actions" do + rule_str = ~C"(#{}) -> []" + assert match?(%{}, parse(rule_str)) + end + end + + describe "DSL parsing error handling :" do + test "body is not a list" do + rule_str = ~C"(#{}) -> hello" + error_str = 'Function body should be a list of actions' + + assert catch_throw(parse(rule_str)) == {:error, error_str} + end + + test "body contains calls" do + rule_str = ~C"(#{}) -> [module:function()]" + error_str = ~C"unsuported action 'module:function()'" + + assert catch_throw(parse(rule_str)) == {:error, error_str} + end + + test "less than one argument" do + rule_str = ~C"() -> [report]" + error_str = ~C"The arrity of the filter function should be 1" + + assert catch_throw(parse(rule_str)) == {:error, error_str} + end + + test "more than one argument" do + rule_str = ~C"(#{}, foo) -> [report]" + error_str = ~C"The arrity of the filter function should be 1" + + assert catch_throw(parse(rule_str)) == {:error, error_str} + end + + test "argument is not a map (atom)" do + rule_str = ~C"(atom) -> [report]" + error_str = ~C"The only argument of the filter should be map" + + assert catch_throw(parse(rule_str)) == {:error, error_str} + end + + test "argument is not a map (list)" do + rule_str = ~C"([atom]) -> [report]" + error_str = ~C"The only argument of the filter should be map" + + assert catch_throw(parse(rule_str)) == {:error, error_str} + end + + test "argument is not a map (integer)" do + rule_str = ~C"(1) -> [report]" + error_str = ~C"The only argument of the filter should be map" + + assert catch_throw(parse(rule_str)) == {:error, error_str} + end + + test "argument is not a map (float)" do + rule_str = ~C"(1.0) -> [report]" + error_str = ~C"The only argument of the filter should be map" + + assert catch_throw(parse(rule_str)) == {:error, error_str} + end + end + + defp parse(rule) do + :ctrace_dsl.parse_rule('test', rule) + end + + defp set_actions(%{} = rule) do + set_actions(rule, &set_action/1) + end + + defp set_actions(%{:actions => actions} = rule, map_fun) do + actions = + actions + |> Enum.map(map_fun) + + %{rule | actions: actions} + end + + defp set_action({:sample, rate}) do + {__MODULE__, :sample, [rate]} + end + + defp set_action(:report) do + {__MODULE__, :report, []} + end + + def as_is(arg) do + arg + end + + def sample(_rate) do + fn _ -> true end + end + + def report() do + fn _ -> true end + end + + def generate(rules) do + ast = :ctrace_dsl.generate(@filter_module, rules) + Logger.debug(fn -> "Generated module:\n#{:ctrace_dsl.source(ast)}\n" end) + ast + end + + def match(tags) do + @filter_module.match(tags) + end +end diff --git a/src/ctrace/test/exunit/ctrace_test.exs b/src/ctrace/test/exunit/ctrace_test.exs new file mode 100644 index 0000000..cc46a40 --- /dev/null +++ b/src/ctrace/test/exunit/ctrace_test.exs @@ -0,0 +1,88 @@ +defmodule Couch.CTrace.Test do + require Logger + use ExUnit.Case + @moduletag capture_log: true + + setup do + apps = :test_util.start_applications([:ctrace]) + :meck.new(:couch_log, [{:stub_all, :meck.val(:ok)}]) + :meck.new(:jaeger_passage_reporter, [:passthrough]) + :meck.expect(:jaeger_passage_reporter, :report, fn _, _ -> :ok end) + + on_exit(fn -> + :test_util.stop_applications(apps) + :meck.unload() + end) + + :config.set('tracing.samplers', 'all-docs', 'all', false) + :config.set('tracing.all-docs', 'all', ~C"(#{}) -> [report]", false) + :config.set_boolean('tracing', 'enabled', true, false) + + {:ok, reporter} = + :test_util.wait_other_value( + fn -> + :passage_tracer_registry.get_reporter(:"all-docs") + end, + :error + ) + + filter = :passage_reporter.get_state(reporter) + %{filter: :ctrace_filter.module(filter)} + end + + describe "Basic : " do + test "spans are reported" do + :ctrace.start_span(:"all-docs") + :ctrace.finish_span() + + assert length(reports()) == 1 + end + + test "child spans are reported" do + :ctrace.start_span(:"all-docs") + :ctrace.start_span(:"child-span") + :ctrace.finish_span() + :ctrace.finish_span() + + assert length(reports()) == 2 + end + end + + defp reports() do + events = + :meck.history(:jaeger_passage_reporter) + |> Enum.filter(fn event -> + {_, fun, _} = elem(event, 1) + fun == :report + end) + + events + |> Enum.flat_map(fn event -> + {_, _, args} = elem(event, 1) + [args] + end) + end + + defp capture_logs(level, regexp) do + history(:couch_log, level, 2) + |> Enum.flat_map(fn event -> + {_, _, [msg, args]} = elem(event, 1) + + if Regex.match?(regexp, List.to_string(msg)) do + [args] + else + [] + end + end) + end + + defp history(module, function, arity) do + history = :meck.history(module) + + history + |> Enum.filter(fn event -> + {_, fun, args} = elem(event, 1) + function == fun and arity == length(args) + end) + end +end diff --git a/src/ctrace/test/exunit/test_helper.exs b/src/ctrace/test/exunit/test_helper.exs new file mode 100644 index 0000000..3140500 --- /dev/null +++ b/src/ctrace/test/exunit/test_helper.exs @@ -0,0 +1,2 @@ +ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter]) +ExUnit.start()
