This is an automated email from the ASF dual-hosted git repository. dlive pushed a commit to branch 0.4.0 in repository https://gitbox.apache.org/repos/asf/dubbo-erlang.git
commit a5a0cde2469b17b3fc2e710de8c395ac97b8d2c1 Author: DLive <[email protected]> AuthorDate: Wed Jun 26 18:05:16 2019 +0800 redesign: invoker process --- src/dubbo_invoker.erl | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++- src/dubboerl_app.erl | 16 ++++++++-- 2 files changed, 95 insertions(+), 4 deletions(-) diff --git a/src/dubbo_invoker.erl b/src/dubbo_invoker.erl index 0a3527b..f01cec4 100644 --- a/src/dubbo_invoker.erl +++ b/src/dubbo_invoker.erl @@ -16,8 +16,89 @@ %%------------------------------------------------------------------------------ -module(dubbo_invoker). +-include("dubbo.hrl"). %% API -export([]). --callback(invoke(Invoker,Invocation) -> ok). \ No newline at end of file +-callback(invoke(Invoker,Invocation) -> ok). + + +%% API +-export([invoke_request/2, invoke_request/3, invoke_request/5]). + +-spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}) -> + {ok, reference()}| + {ok, reference(), Data :: any(), RpcContent :: list()}| + {error, Reason :: timeout|no_provider|any()}. +invoke_request(Interface, Request) -> + invoke_request(Interface, Request, [], #{}, self()). + +-spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RequestOption :: map()) -> + {ok, reference()}| + {ok, reference(), Data :: any(), RpcContent :: list()}| + {error, Reason :: timeout|no_provider|any()}. +invoke_request(Interface, Request, RequestOption) -> + invoke_request(Interface, Request, maps:get(ctx, RequestOption, []), RequestOption, self()). + + +-spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RpcContext :: list(), RequestState :: map(), CallBackPid :: pid()) -> + {ok, reference()}| + {ok, reference(), Data :: any(), RpcContent :: list()}| + {error, Reason :: timeout|no_provider|request_full|any()}. +invoke_request(Interface, Request, RpcContext, RequestState, CallBackPid) -> + case dubbo_provider_consumer_reg_table:select_connection(Interface, Request#dubbo_request.mid) of + {ok, #connection_info{pid = Pid, host_flag = HostFlag}} -> + case dubbo_traffic_control:check_goon(HostFlag, 199) of + ok -> + Request2 = merge_attachments(Request, RpcContext), + {ok, RequestData} = dubbo_codec:encode_request(Request2), + Ref = get_ref(RequestState), + gen_server:cast(Pid, {send_request, Ref, Request2, RequestData, CallBackPid, RequestState}), + case is_sync(RequestState) of + true -> + sync_receive(Ref, get_timeout(RequestState)); + false -> {ok, Ref} + end; + full -> + {error, request_full} + end; + {error, none} -> + logger:error("[INVOKE] ~p error Reason no_provider", [Interface]), + {error, no_provider} + end. + + +is_sync(Option) -> + maps:is_key(sync, Option). +get_ref(Option) -> + maps:get(ref, Option, make_ref()). + +get_timeout(Option) -> + maps:get(timeout, Option, ?REQUEST_TIME_OUT). + + +sync_receive(Ref, TimeOut) -> + receive + {'$gen_cast', {response_process, Ref, RpcContent, Response}} -> + {ok, Ref, Response, RpcContent} + after + TimeOut -> + {error, timeout} + end. +merge_attachments(#dubbo_request{data = null} = Request, _Option) -> + Request; +merge_attachments(Request, Option) -> + Attachements = Request#dubbo_request.data#dubbo_rpc_invocation.attachments, + case lists:keyfind(attachments, 1, Option) of + false -> OptionAttachments = []; + {attachments, OptionAttachments} -> + OptionAttachments + end, + List = [ + {<<"version">>, <<"0.0.0">>}, + {<<"timeout">>, <<"5000">>} + ], + Attachements2 = lists:merge3(Attachements, OptionAttachments, List), + Data2 = Request#dubbo_request.data#dubbo_rpc_invocation{attachments = Attachements2}, + Request#dubbo_request{data = Data2}. diff --git a/src/dubboerl_app.erl b/src/dubboerl_app.erl index 223e842..0f51768 100644 --- a/src/dubboerl_app.erl +++ b/src/dubboerl_app.erl @@ -27,9 +27,14 @@ %%==================================================================== start(_StartType, _StartArgs) -> - io:format("[START] dubbo framework server start~n"), -%% env_init(), - dubboerl_sup:start_link(). + logger:info("[START] dubbo framework server start"), + case dubboerl_sup:start_link() of + {ok,Pid} -> + init_default_hooks(), + {ok,Pid}; + Result -> + Result + end. %%-------------------------------------------------------------------- stop(_State) -> @@ -38,6 +43,11 @@ stop(_State) -> %%==================================================================== %% Internal functions %%==================================================================== +init_default_hooks()-> + dubbo_extension:register(protocol,dubbo_protocol_dubbo,10), + dubbo_extension:register(protocol_wapper,dubbo_protocol_registry,10), + + ok. env_init() -> ets:new(?PROVIDER_IMPL_TABLE, [public, named_table]), dubbo_traffic_control:init(),
