This is an automated email from the ASF dual-hosted git repository.

dlive pushed a commit to branch 0.4.0
in repository https://gitbox.apache.org/repos/asf/dubbo-erlang.git

commit a5a0cde2469b17b3fc2e710de8c395ac97b8d2c1
Author: DLive <[email protected]>
AuthorDate: Wed Jun 26 18:05:16 2019 +0800

    redesign: invoker process
---
 src/dubbo_invoker.erl | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++-
 src/dubboerl_app.erl  | 16 ++++++++--
 2 files changed, 95 insertions(+), 4 deletions(-)

diff --git a/src/dubbo_invoker.erl b/src/dubbo_invoker.erl
index 0a3527b..f01cec4 100644
--- a/src/dubbo_invoker.erl
+++ b/src/dubbo_invoker.erl
@@ -16,8 +16,89 @@
 
%%------------------------------------------------------------------------------
 -module(dubbo_invoker).
 
+-include("dubbo.hrl").
 %% API
 -export([]).
 
 
--callback(invoke(Invoker,Invocation) -> ok).
\ No newline at end of file
+-callback(invoke(Invoker,Invocation) -> ok).
+
+
+%% API
+-export([invoke_request/2, invoke_request/3, invoke_request/5]).
+
+-spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}) ->
+    {ok, reference()}|
+    {ok, reference(), Data :: any(), RpcContent :: list()}|
+    {error, Reason :: timeout|no_provider|any()}.
+invoke_request(Interface, Request) ->
+    invoke_request(Interface, Request, [], #{}, self()).
+
+-spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, 
RequestOption :: map()) ->
+    {ok, reference()}|
+    {ok, reference(), Data :: any(), RpcContent :: list()}|
+    {error, Reason :: timeout|no_provider|any()}.
+invoke_request(Interface, Request, RequestOption) ->
+    invoke_request(Interface, Request, maps:get(ctx, RequestOption, []), 
RequestOption, self()).
+
+
+-spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, 
RpcContext :: list(), RequestState :: map(), CallBackPid :: pid()) ->
+    {ok, reference()}|
+    {ok, reference(), Data :: any(), RpcContent :: list()}|
+    {error, Reason :: timeout|no_provider|request_full|any()}.
+invoke_request(Interface, Request, RpcContext, RequestState, CallBackPid) ->
+    case dubbo_provider_consumer_reg_table:select_connection(Interface, 
Request#dubbo_request.mid) of
+        {ok, #connection_info{pid = Pid, host_flag = HostFlag}} ->
+            case dubbo_traffic_control:check_goon(HostFlag, 199) of
+                ok ->
+                    Request2 = merge_attachments(Request, RpcContext),
+                    {ok, RequestData} = dubbo_codec:encode_request(Request2),
+                    Ref = get_ref(RequestState),
+                    gen_server:cast(Pid, {send_request, Ref, Request2, 
RequestData, CallBackPid, RequestState}),
+                    case is_sync(RequestState) of
+                        true ->
+                            sync_receive(Ref, get_timeout(RequestState));
+                        false -> {ok, Ref}
+                    end;
+                full ->
+                    {error, request_full}
+            end;
+        {error, none} ->
+            logger:error("[INVOKE] ~p error Reason no_provider", [Interface]),
+            {error, no_provider}
+    end.
+
+
+is_sync(Option) ->
+    maps:is_key(sync, Option).
+get_ref(Option) ->
+    maps:get(ref, Option, make_ref()).
+
+get_timeout(Option) ->
+    maps:get(timeout, Option, ?REQUEST_TIME_OUT).
+
+
+sync_receive(Ref, TimeOut) ->
+    receive
+        {'$gen_cast', {response_process, Ref, RpcContent, Response}} ->
+            {ok, Ref, Response, RpcContent}
+    after
+        TimeOut ->
+            {error, timeout}
+    end.
+merge_attachments(#dubbo_request{data = null} = Request, _Option) ->
+    Request;
+merge_attachments(Request, Option) ->
+    Attachements = Request#dubbo_request.data#dubbo_rpc_invocation.attachments,
+    case lists:keyfind(attachments, 1, Option) of
+        false -> OptionAttachments = [];
+        {attachments, OptionAttachments} ->
+            OptionAttachments
+    end,
+    List = [
+        {<<"version">>, <<"0.0.0">>},
+        {<<"timeout">>, <<"5000">>}
+    ],
+    Attachements2 = lists:merge3(Attachements, OptionAttachments, List),
+    Data2 = Request#dubbo_request.data#dubbo_rpc_invocation{attachments = 
Attachements2},
+    Request#dubbo_request{data = Data2}.
diff --git a/src/dubboerl_app.erl b/src/dubboerl_app.erl
index 223e842..0f51768 100644
--- a/src/dubboerl_app.erl
+++ b/src/dubboerl_app.erl
@@ -27,9 +27,14 @@
 %%====================================================================
 
 start(_StartType, _StartArgs) ->
-    io:format("[START] dubbo framework server start~n"),
-%%    env_init(),
-    dubboerl_sup:start_link().
+    logger:info("[START] dubbo framework server start"),
+    case dubboerl_sup:start_link() of
+        {ok,Pid} ->
+            init_default_hooks(),
+            {ok,Pid};
+        Result ->
+            Result
+    end.
 
 %%--------------------------------------------------------------------
 stop(_State) ->
@@ -38,6 +43,11 @@ stop(_State) ->
 %%====================================================================
 %% Internal functions
 %%====================================================================
+init_default_hooks()->
+    dubbo_extension:register(protocol,dubbo_protocol_dubbo,10),
+    dubbo_extension:register(protocol_wapper,dubbo_protocol_registry,10),
+
+    ok.
 env_init() ->
     ets:new(?PROVIDER_IMPL_TABLE, [public, named_table]),
     dubbo_traffic_control:init(),

Reply via email to