This is an automated email from the ASF dual-hosted git repository. dlive pushed a commit to branch 0.4.0 in repository https://gitbox.apache.org/repos/asf/dubbo-erlang.git
commit 07e743aa315bd5a08f9694337f0ebf8f4f3da507 Author: DLive <[email protected]> AuthorDate: Mon Jun 10 23:31:34 2019 +0800 mod defined for plugin design --- config_example/sys.config | 2 +- include/dubboerl.hrl | 5 +- rebar.config | 5 +- rebar.lock | 4 + .../src/main/resources/applicationProvider.xml | 6 +- .../apps/dubbo_sample_service/src/userOperator.erl | 8 +- include/dubboerl.hrl => src/dubbo_cluster.erl | 8 +- .../dubboerl.hrl => src/dubbo_cluster_failfast.erl | 8 +- src/dubbo_common_fun.erl | 60 +++- src/dubbo_directory.erl | 173 ++++++++++++ src/dubbo_extension.erl | 189 +++++++++++++ include/dubboerl.hrl => src/dubbo_filter.erl | 8 +- src/dubbo_invoker.erl | 78 +---- .../dubboerl.hrl => src/dubbo_invoker_cluster.erl | 8 +- src/{dubbo_invoker.erl => dubbo_invoker_old.erl} | 2 +- include/dubboerl.hrl => src/dubbo_protocol.erl | 11 +- .../dubboerl.hrl => src/dubbo_protocol_dubbo.erl | 8 +- src/dubbo_protocol_registry.erl | 53 ++++ .../dubbo_provider_consumer_reg_table.erl | 8 +- src/dubbo_reference_config.erl | 74 +++++ src/{dubbo_common_fun.erl => dubbo_registry.erl} | 38 ++- src/dubbo_registry_zookeeper.erl | 314 +++++++++++++++++++++ test/userOperator.erl | 8 +- 23 files changed, 947 insertions(+), 131 deletions(-) diff --git a/config_example/sys.config b/config_example/sys.config index b100088..0ed8a87 100644 --- a/config_example/sys.config +++ b/config_example/sys.config @@ -23,7 +23,7 @@ {protocol,hessian}, {port,20881}, {consumer,[ - {<<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>,[]} + % {<<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>,[]} ]}, {provider,[ {dubbo_service_user_impl,userOperator,<<"org.apache.dubbo.erlang.sample.service.facade.UserOperator">>,[]} diff --git a/include/dubboerl.hrl b/include/dubboerl.hrl index 31204a5..ce6baf1 100644 --- a/include/dubboerl.hrl +++ b/include/dubboerl.hrl @@ -18,4 +18,7 @@ -define(PROVIDER_WORKER,provider_worker). --define(TRAFFIC_CONTROL,traffic_control). \ No newline at end of file +-define(TRAFFIC_CONTROL,traffic_control). + + +-record(dubbo_url,{scheme,user_info,host,port,path,parameters,fragment}). \ No newline at end of file diff --git a/rebar.config b/rebar.config index aebfaa3..2eb91f2 100644 --- a/rebar.config +++ b/rebar.config @@ -21,9 +21,10 @@ {deps, [ {erlzk, ".*", {git, "https://github.com/huaban/erlzk.git", {tag, "v0.6.2"}}}, - {ranch, ".*", {git, "https://github.com/ninenines/ranch.git", {tag, "1.4.0"}}}, + {ranch, ".*", {git, "https://github.com/ninenines/ranch.git", {tag, "1.4.0"}}}, {poolboy, ".*", {git, "https://github.com/devinus/poolboy.git", {tag, "1.5.1"}}}, - {jiffy, "0.15.1"} + {jiffy, "0.15.1"}, + {hooks,{git,"https://github.com/benoitc/hooks.git",{tag,"2.1.0"}}} ]}. diff --git a/rebar.lock b/rebar.lock index f57b258..cc2515b 100644 --- a/rebar.lock +++ b/rebar.lock @@ -3,6 +3,10 @@ {git,"https://github.com/huaban/erlzk.git", {ref,"aa7190ee2343ac1341cea3edc9b9eea36c591708"}}, 0}, + {<<"hooks">>, + {git,"https://github.com/benoitc/hooks.git", + {ref,"d4872554a27c0ee9c2166d18000f725f8c3dc8a8"}}, + 0}, {<<"jiffy">>,{pkg,<<"jiffy">>,<<"0.15.1">>},0}, {<<"poolboy">>, {git,"https://github.com/devinus/poolboy.git", diff --git a/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml b/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml index 974fabc..425e41e 100644 --- a/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml +++ b/samples/dubbo-sample-service/src/main/resources/applicationProvider.xml @@ -10,8 +10,8 @@ <dubbo:consumer check="false" timeout="300000" id="dubboConsumerConfig" retries="0"/> - <bean id="userService" class="org.apache.dubbo.erlang.sample.service.impl.UserOperatorImpl"/> - <dubbo:service interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" ref="userService"/> +<!-- <bean id="userService" class="org.apache.dubbo.erlang.sample.service.impl.UserOperatorImpl"/>--> +<!-- <dubbo:service interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" ref="userService"/>--> - <!-- <dubbo:reference id="userInterface" interface="UserOperator" retries="0" />--> + <dubbo:reference id="userInterface" interface="org.apache.dubbo.erlang.sample.service.facade.UserOperator" retries="0" /> </beans> diff --git a/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl b/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl index b92ca62..d7e0809 100644 --- a/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl +++ b/samples/dubboerl_demo/apps/dubbo_sample_service/src/userOperator.erl @@ -67,7 +67,7 @@ getUserInfo(Arg0, RequestOption)-> ] }, Request = dubbo_adapter:reference(Data), - dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption). + dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption). -spec genUserId()-> @@ -96,7 +96,7 @@ genUserId( RequestOption)-> ] }, Request = dubbo_adapter:reference(Data), - dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption). + dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption). -spec queryUserInfo(Arg0::#userInfoRequest{})-> @@ -127,7 +127,7 @@ queryUserInfo(Arg0, RequestOption)-> ] }, Request = dubbo_adapter:reference(Data), - dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption). + dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption). -spec queryUserList(Arg0::list())-> @@ -158,5 +158,5 @@ queryUserList(Arg0, RequestOption)-> ] }, Request = dubbo_adapter:reference(Data), - dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption). + dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption). diff --git a/include/dubboerl.hrl b/src/dubbo_cluster.erl similarity index 87% copy from include/dubboerl.hrl copy to src/dubbo_cluster.erl index 31204a5..da4031e 100644 --- a/include/dubboerl.hrl +++ b/src/dubbo_cluster.erl @@ -14,8 +14,8 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%------------------------------------------------------------------------------ --define(PROVIDER_IMPL_TABLE,provider_impl_table). +-module(dubbo_cluster). +-author("dlive"). --define(PROVIDER_WORKER,provider_worker). - --define(TRAFFIC_CONTROL,traffic_control). \ No newline at end of file +%% API +-export([]). diff --git a/include/dubboerl.hrl b/src/dubbo_cluster_failfast.erl similarity index 87% copy from include/dubboerl.hrl copy to src/dubbo_cluster_failfast.erl index 31204a5..6078517 100644 --- a/include/dubboerl.hrl +++ b/src/dubbo_cluster_failfast.erl @@ -14,8 +14,8 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%------------------------------------------------------------------------------ --define(PROVIDER_IMPL_TABLE,provider_impl_table). +-module(dubbo_cluster_failfast). +-author("dlive"). --define(PROVIDER_WORKER,provider_worker). - --define(TRAFFIC_CONTROL,traffic_control). \ No newline at end of file +%% API +-export([]). diff --git a/src/dubbo_common_fun.erl b/src/dubbo_common_fun.erl index 6744171..e74f57f 100644 --- a/src/dubbo_common_fun.erl +++ b/src/dubbo_common_fun.erl @@ -16,8 +16,9 @@ %%------------------------------------------------------------------------------ -module(dubbo_common_fun). +-include("dubboerl.hrl"). %% API --export([local_ip_v4/0, local_ip_v4_str/0]). +-export([local_ip_v4/0, local_ip_v4_str/0, parse_url/1, map_to_url/1]). local_ip_v4() -> {ok, Addrs} = inet:getifaddrs(), @@ -29,3 +30,60 @@ local_ip_v4() -> local_ip_v4_str() -> {V1, V2, V3, V4} = local_ip_v4(), list_to_binary(io_lib:format("~p.~p.~p.~p", [V1, V2, V3, V4])). + + +-spec(parse_url(Url :: binary()|list()) -> {ok, map()}). +parse_url(Url) when is_binary(Url) -> + parse_url(binary_to_list(Url)); +parse_url(Url) -> + case http_uri:parse(Url, []) of + {ok, {Scheme, _UserInfo, Host, Port, _Path, Query}} -> + QueryStr = case lists:prefix("?", Query) of + true -> + [_ | Query2] = Query, + Query2; + false -> + Query + end, + QueryListTmp = string:tokens(QueryStr, "&"), + Parameters = parse_url_parameter(QueryListTmp, #{}), + Result = #dubbo_url{scheme = Scheme, host = Host, port = Port, parameters = Parameters}, + {ok, Result}; + {error, R1} -> + {error, R1} + end. + + +parse_url_parameter([], Parameters) -> + Parameters; +parse_url_parameter([Item | Rest], Parameters) -> + case string:tokens(Item, "=") of + KeyPair when length(KeyPair) == 2 -> + [Key, Value] = KeyPair, + parse_url_parameter(Rest, maps:put(Key, Value, Parameters)); + KeyPair2 -> + logger:error("parse parameter error, keypair ~p", [KeyPair2]), + parse_url_parameter(Rest, Parameters) + end. + + +map_to_url(UrlInfo) -> + ParameterStr = + case UrlInfo#dubbo_url.parameters of + undefined -> + ""; + Parameter -> + KeyValues = maps:to_list(Parameter), + KeyValues2 = [io_lib:format("~s=~s", [Key, http_uri:encode(Value)]) || {Key, Value} <= KeyValues], + ParameterStr1 = string:join(KeyValues2, "&"), + ParameterStr2 = ["?" | ParameterStr1], + list_to_binary(ParameterStr2) + end, + Value = io_lib:format(<<"~s://~s/~s?~s">>, + [ + UrlInfo#dubbo_url.scheme, + UrlInfo#dubbo_url.host, + UrlInfo#dubbo_url.path, + ParameterStr + ]), + list_to_binary(Value). \ No newline at end of file diff --git a/src/dubbo_directory.erl b/src/dubbo_directory.erl new file mode 100644 index 0000000..2356e7c --- /dev/null +++ b/src/dubbo_directory.erl @@ -0,0 +1,173 @@ +%%------------------------------------------------------------------------------ +%% Licensed to the Apache Software Foundation (ASF) under one or more +%% contributor license agreements. See the NOTICE file distributed with +%% this work for additional information regarding copyright ownership. +%% The ASF licenses this file to You under the Apache License, Version 2.0 +%% (the "License"); you may not use this file except in compliance with +%% the License. You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%------------------------------------------------------------------------------ +-module(dubbo_directory). + +-behaviour(gen_server). + +-export([subscribe/2,notify/1]). +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, {}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @end +%%-------------------------------------------------------------------- +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @end +%%-------------------------------------------------------------------- +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, #state{}}. + +subscribe(RegistryName,SubcribeUrl)-> + try gen_server:call(?SERVER,{subscribe,RegistryName,SubcribeUrl},5000) of + ok-> + ok + catch + Error:Reason-> + %% todo improve erro type + {error,Reason} + end. + +notify(UrlList)-> + dubbo_consumer_pool:start_consumer(Interface, UrlList), + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% +%% @end +%%-------------------------------------------------------------------- +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({subscribe,RegistryName,SubcribeUrl}, _From, State) -> + NotifyFun= fun dubbo_directory:notify/1, + apply(RegistryName,subscribe,[SubcribeUrl,NotifyFun]), + {reply, ok, State}; +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% +%% @end +%%-------------------------------------------------------------------- +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + diff --git a/src/dubbo_extension.erl b/src/dubbo_extension.erl new file mode 100644 index 0000000..4e5eb0c --- /dev/null +++ b/src/dubbo_extension.erl @@ -0,0 +1,189 @@ +%%------------------------------------------------------------------------------ +%% Licensed to the Apache Software Foundation (ASF) under one or more +%% contributor license agreements. See the NOTICE file distributed with +%% this work for additional information regarding copyright ownership. +%% The ASF licenses this file to You under the Apache License, Version 2.0 +%% (the "License"); you may not use this file except in compliance with +%% the License. You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%------------------------------------------------------------------------------ +-module(dubbo_extension). +-behaviour(gen_server). + +%% API +-export([run/3,run_fold/4,register/3,unregister/3]). + + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + + +-define(TAB, ?MODULE). + +-record(state, {}). + + +-spec reg(HookName::hookname(), Module::atom(),Priority::integer()) -> ok | {error, term()}. +register(HookName, Module,Priority) -> + gen_server:call(?MODULE, {register, HookName, {Priority, {Module}}}). + +-spec unregister(HookName::hookname(), Module::atom(),Priority::integer()) -> ok. +unregister(HookName, Module,Priority) -> + gen_server:call(?MODULE, {unregister, HookName, {Priority, Module}}). + +%% @doc run all hooks registered for the HookName. +%% Execution can be interrupted if an hook return the atom `stop'. +-spec run(HookName::hookname(), Args::list()) -> ok. +run(HookName,Fun, Args) -> + case find_hooks(HookName) of + no_hook -> ok; + Hooks -> run1(Hooks, HookName,Fun, Args) + end. + +run1([], _HookName,_Fun, _Args) -> + ok; +run1([M | Rest], HookName, Fun, Args) -> + Ret = (catch apply(M, Fun, Args)), + case Ret of + {'EXIT', Reason} -> + logger:error("~p~n error running extension: ~p~n", [HookName, Reason]), + run1(Rest, HookName,Fun, Args); + stop -> + ok; + _ -> + run1(Rest, HookName,Fun, Args) + end. + +-spec run_fold(HookName::hookname(), Args::list(), Acc::any()) -> Acc2::any(). +run_fold(HookName, Fun, Args, Acc) -> + case find_hooks(HookName) of + no_hook -> Acc; + Hooks -> run_fold1(Hooks,HookName, Fun, Args, Acc) + end. + + +run_fold1([], _HookName,_Fun, _Args, Acc) -> + Acc; +run_fold1([M | Rest], HookName,Fun, Args0, Acc) -> + Args = Args0 ++ [Acc], + Ret = (catch apply(M, Fun, Args)), + case Ret of + {'EXIT', Reason} -> + error_logger:error_msg("~p~n error running hook: ~p~n", + [HookName, Reason]), + run_fold1(Rest, HookName,Fun,Args0, Acc); + stop -> + Acc; + {stop, NewAcc} -> + NewAcc; + _ -> + run_fold1(Rest, HookName,Fun,Args0, Ret) + end. + + + + +%% @doc retrieve the lists of registered functions for an hook. +-spec find(HookName::hookname()) -> {ok, [{atom(), atom()}]} | error. +find(HookName) -> + case ?find_hook(HookName) of + no_hook -> error; + Hooks -> {ok, Hooks} + end. + +%% @hidden +start_link() -> + _ = init_tabs(), + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +init_tabs() -> + case ets:info(?TAB, name) of + undefined -> + ets:new(?TAB, [ordered_set, public, named_table, + {read_concurrency, true}, + {write_concurrency, true}]); + _ -> + true + end. + +%% @hidden +init([]) -> + {ok, #state{}}. + +%% @hidden +handle_call({register, HookName, {Priority, Module}}, _From, State) -> + do_register(HookName, {Priority, Module}), + {reply, ok, State}; +handle_call({unregister, HookName, {Priority, Module}}, _From, State) -> + do_unregister(HookName, {Priority, Module}), + {reply, ok, State}; +handle_call(_Msg, _From, State) -> + {reply, badarg, State}. + +%% @hidden +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +%% @hidden +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% @hidden +terminate(_Reason, _Srv) -> + ok. + +do_register(HookName, {_Priority, ModuleName}=Hook) -> + check_module(ModuleName), + update_hooks(HookName, [Hook]). + + +do_unregister(HookName, Hook) -> + remove_hooks(HookName, [Hook]), + ok. + +update_hooks(HookName, HookFuns) -> + case ets:lookup(?TAB, HookName) of + [] -> + true = ets:insert(?TAB, {HookName, HookFuns}); + [{_, Funs}] -> + Funs2 = lists:keysort(1, Funs ++ HookFuns), + true = ets:insert(?TAB, {HookName, Funs2}) + end. + +remove_hooks(HookName, HookFuns) -> + case ets:lookup(?TAB, HookName) of + [] -> + ok; + [{_, Funs}] -> + Funs2 = Funs -- HookFuns, + case Funs2 of + [] -> + ets:delete(?TAB, HookName); + _ -> + ets:insert(?TAB, {HookName, Funs2}) + end + end. + +check_module(ModuleName) -> + _ = code:ensure_loaded(ModuleName), + ok. + +find_hooks(HookName)-> + case ets:lookup(?TAB,HookName) of + []-> + no_hook; + [{_, Modules}]-> + Modules + end. diff --git a/include/dubboerl.hrl b/src/dubbo_filter.erl similarity index 87% copy from include/dubboerl.hrl copy to src/dubbo_filter.erl index 31204a5..7fa7950 100644 --- a/include/dubboerl.hrl +++ b/src/dubbo_filter.erl @@ -14,8 +14,8 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%------------------------------------------------------------------------------ --define(PROVIDER_IMPL_TABLE,provider_impl_table). +-module(dubbo_filter). +-author("dlive"). --define(PROVIDER_WORKER,provider_worker). - --define(TRAFFIC_CONTROL,traffic_control). \ No newline at end of file +%% API +-export([]). diff --git a/src/dubbo_invoker.erl b/src/dubbo_invoker.erl index b29751c..0a3527b 100644 --- a/src/dubbo_invoker.erl +++ b/src/dubbo_invoker.erl @@ -16,82 +16,8 @@ %%------------------------------------------------------------------------------ -module(dubbo_invoker). --include("dubbo.hrl"). %% API --export([invoke_request/2, invoke_request/3, invoke_request/5]). +-export([]). --spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}) -> - {ok, reference()}| - {ok, reference(), Data :: any(), RpcContent :: list()}| - {error, Reason :: timeout|no_provider|any()}. -invoke_request(Interface, Request) -> - invoke_request(Interface, Request, [], #{}, self()). --spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RequestOption :: map()) -> - {ok, reference()}| - {ok, reference(), Data :: any(), RpcContent :: list()}| - {error, Reason :: timeout|no_provider|any()}. -invoke_request(Interface, Request, RequestOption) -> - invoke_request(Interface, Request, maps:get(ctx, RequestOption, []), RequestOption, self()). - - --spec invoke_request(Interface :: binary(), Request :: #dubbo_request{}, RpcContext :: list(), RequestState :: map(), CallBackPid :: pid()) -> - {ok, reference()}| - {ok, reference(), Data :: any(), RpcContent :: list()}| - {error, Reason :: timeout|no_provider|request_full|any()}. -invoke_request(Interface, Request, RpcContext, RequestState, CallBackPid) -> - case dubbo_consumer_pool:select_connection(Interface, Request#dubbo_request.mid) of - {ok, #connection_info{pid = Pid, host_flag = HostFlag}} -> - case dubbo_traffic_control:check_goon(HostFlag, 199) of - ok -> - Request2 = merge_attachments(Request, RpcContext), - {ok, RequestData} = dubbo_codec:encode_request(Request2), - Ref = get_ref(RequestState), - gen_server:cast(Pid, {send_request, Ref, Request2, RequestData, CallBackPid, RequestState}), - case is_sync(RequestState) of - true -> - sync_receive(Ref, get_timeout(RequestState)); - false -> {ok, Ref} - end; - full -> - {error, request_full} - end; - {error, none} -> - logger:error("[INVOKE] ~p error Reason no_provider", [Interface]), - {error, no_provider} - end. - - -is_sync(Option) -> - maps:is_key(sync, Option). -get_ref(Option) -> - maps:get(ref, Option, make_ref()). - -get_timeout(Option) -> - maps:get(timeout, Option, ?REQUEST_TIME_OUT). - - -sync_receive(Ref, TimeOut) -> - receive - {'$gen_cast', {response_process, Ref, RpcContent, Response}} -> - {ok, Ref, Response, RpcContent} - after - TimeOut -> - {error, timeout} - end. -merge_attachments(#dubbo_request{data = null} = Request, _Option) -> - Request; -merge_attachments(Request, Option) -> - Attachements = Request#dubbo_request.data#dubbo_rpc_invocation.attachments, - case lists:keyfind(attachments, 1, Option) of - false -> OptionAttachments = []; - {attachments, OptionAttachments} -> - OptionAttachments - end, - List = [ - {<<"version">>, <<"0.0.0">>}, - {<<"timeout">>, <<"5000">>} - ], - Attachements2 = lists:merge3(Attachements, OptionAttachments, List), - Data2 = Request#dubbo_request.data#dubbo_rpc_invocation{attachments = Attachements2}, - Request#dubbo_request{data = Data2}. +-callback(invoke(Invoker,Invocation) -> ok). \ No newline at end of file diff --git a/include/dubboerl.hrl b/src/dubbo_invoker_cluster.erl similarity index 87% copy from include/dubboerl.hrl copy to src/dubbo_invoker_cluster.erl index 31204a5..906a0fa 100644 --- a/include/dubboerl.hrl +++ b/src/dubbo_invoker_cluster.erl @@ -14,8 +14,8 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%------------------------------------------------------------------------------ --define(PROVIDER_IMPL_TABLE,provider_impl_table). +-module(dubbo_invoker_cluster). +-author("dlive"). --define(PROVIDER_WORKER,provider_worker). - --define(TRAFFIC_CONTROL,traffic_control). \ No newline at end of file +%% API +-export([]). diff --git a/src/dubbo_invoker.erl b/src/dubbo_invoker_old.erl similarity index 99% copy from src/dubbo_invoker.erl copy to src/dubbo_invoker_old.erl index b29751c..c878656 100644 --- a/src/dubbo_invoker.erl +++ b/src/dubbo_invoker_old.erl @@ -14,7 +14,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%------------------------------------------------------------------------------ --module(dubbo_invoker). +-module(dubbo_invoker_old). -include("dubbo.hrl"). %% API diff --git a/include/dubboerl.hrl b/src/dubbo_protocol.erl similarity index 83% copy from include/dubboerl.hrl copy to src/dubbo_protocol.erl index 31204a5..3c82119 100644 --- a/include/dubboerl.hrl +++ b/src/dubbo_protocol.erl @@ -14,8 +14,13 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%------------------------------------------------------------------------------ --define(PROVIDER_IMPL_TABLE,provider_impl_table). +-module(dubbo_protocol). --define(PROVIDER_WORKER,provider_worker). +-callback refer(InterfaceClassInfo,Url)->ok. --define(TRAFFIC_CONTROL,traffic_control). \ No newline at end of file +%% API +-export([refer/2]). + + +refer(InterfaceClassInfo,Url)-> + dubbo_hooker(protocol_wapper,refer,[InterfaceClassInfo,Url]). \ No newline at end of file diff --git a/include/dubboerl.hrl b/src/dubbo_protocol_dubbo.erl similarity index 87% copy from include/dubboerl.hrl copy to src/dubbo_protocol_dubbo.erl index 31204a5..2242939 100644 --- a/include/dubboerl.hrl +++ b/src/dubbo_protocol_dubbo.erl @@ -14,8 +14,8 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%------------------------------------------------------------------------------ --define(PROVIDER_IMPL_TABLE,provider_impl_table). +-module(dubbo_protocol_dubbo). +-author("dlive"). --define(PROVIDER_WORKER,provider_worker). - --define(TRAFFIC_CONTROL,traffic_control). \ No newline at end of file +%% API +-export([]). diff --git a/src/dubbo_protocol_registry.erl b/src/dubbo_protocol_registry.erl new file mode 100644 index 0000000..f1d9bc4 --- /dev/null +++ b/src/dubbo_protocol_registry.erl @@ -0,0 +1,53 @@ +%%------------------------------------------------------------------------------ +%% Licensed to the Apache Software Foundation (ASF) under one or more +%% contributor license agreements. See the NOTICE file distributed with +%% this work for additional information regarding copyright ownership. +%% The ASF licenses this file to You under the Apache License, Version 2.0 +%% (the "License"); you may not use this file except in compliance with +%% the License. You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%------------------------------------------------------------------------------ +-module(dubbo_protocol_registry). +-behaviour(dubbo_protocol). + +-include("dubboerl.hrl"). + +%% API +-export([]). + +refer(InterfaceClassInfo,Url)-> + {ok,UrlInfo} = dubbo_common_fun:parse_url(Url), + + {ok,RegistryName} = dubbo_registry:setup_register(UrlInfo), + + ConsumerUrl = gen_consumer_url(UrlInfo), + %% 通知directory + dubbo_registry:register(RegistryName,ConsumerUrl), + + dubbo_directory:subscribe(RegistryName,ConsumerUrl), + + %% return + ok. + + +gen_consumer_url(UrlInfo)-> + Parameters = UrlInfo#dubbo_url.parameters, + #{<<"refer">> := Refer} = Parameters, + Refer2 = http_uri:decode(Refer), + Parameters2 = dubbo_common_fun:parse_url(Refer2,#{}), + #{<<"interface">> := Interface} = Parameters2, + ConsumerUrlInfo = UrlInfo#dubbo_url{ + scheme = <<"consumer">>, + host = dubbo_common_fun:local_ip_v4_str(), + path = Interface, + parameters = Parameters2 + }, + ConsumerUrl = dubbo_common_fun:map_to_url(ConsumerUrlInfo), + ConsumerUrl. \ No newline at end of file diff --git a/include/dubboerl.hrl b/src/dubbo_provider_consumer_reg_table.erl similarity index 87% copy from include/dubboerl.hrl copy to src/dubbo_provider_consumer_reg_table.erl index 31204a5..3386cdc 100644 --- a/include/dubboerl.hrl +++ b/src/dubbo_provider_consumer_reg_table.erl @@ -14,8 +14,8 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%------------------------------------------------------------------------------ --define(PROVIDER_IMPL_TABLE,provider_impl_table). +-module(dubbo_provider_consumer_reg_table). +-author("dlive"). --define(PROVIDER_WORKER,provider_worker). - --define(TRAFFIC_CONTROL,traffic_control). \ No newline at end of file +%% API +-export([]). diff --git a/src/dubbo_reference_config.erl b/src/dubbo_reference_config.erl new file mode 100644 index 0000000..f6e8bdd --- /dev/null +++ b/src/dubbo_reference_config.erl @@ -0,0 +1,74 @@ +%%------------------------------------------------------------------------------ +%% Licensed to the Apache Software Foundation (ASF) under one or more +%% contributor license agreements. See the NOTICE file distributed with +%% this work for additional information regarding copyright ownership. +%% The ASF licenses this file to You under the Apache License, Version 2.0 +%% (the "License"); you may not use this file except in compliance with +%% the License. You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%------------------------------------------------------------------------------ +-module(dubbo_reference_config). + +-record(dubbo_interface_info,{}). + +%% API +-export([]). + +init_reference()-> + InitConfigMap= #{ + + }, + %% 组装各类需要数据 + ok. + + +create_proxy(InitConfigMap)-> + + + InterfaceClassInfo = #{}, + Para = gen_parameter(), + Url = gen_registry_url(Para), + dubbo_extension:run(protoco_wapper,refer,[InterfaceClassInfo,Url]), + ok. + + %%application=hello-world&dubbo=2.0.2&pid=68901&refer=application=hello-world&default.check=false&default.lazy=false&default.retries=0&default.sticky=false&default.timeout=300000&dubbo=2.0.2&interface=org.apache.dubbo.erlang.sample.service.facade.UserOperator&lazy=false&methods=queryUserInfo,queryUserList,genUserId,getUserInfo&pid=68901®ister.ip=127.0.0.1&release=2.7.1&retries=0&side=consumer&sticky=false×tamp=1559727789953®istry=zookeeper&release=2.7.1×tamp=1559727842451 + + +gen_registry_url(Para)-> + %%todo 组装para & url + + Url = "registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=hello-world&dubbo=2.0.2&pid=68901&refer=application%3Dhello-world%26default.check%3Dfalse%26default.lazy%3Dfalse%26default.retries%3D0%26default.sticky%3Dfalse%26default.timeout%3D300000%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.erlang.sample.service.facade.UserOperator%26lazy%3Dfalse%26methods%3DqueryUserInfo%2CqueryUserList%2CgenUserId%2CgetUserInfo%26pid%3D68901%26register.ip%3D127.0.0.1%26 [...] + Url. +gen_parameter()-> + Para = #{ + <<"application">> => get_appname(), + <<"dubbo">> => <<"2.0.2">>, + <<"pid">> => get_pid(), + <<"refer">> => get_refinfo(), + <<"registry">> => get_registry_type(), + <<"release">> => <<"2.7.1">>, + <<"timestamp">> => <<"1559727842451">> + }, + + Para. + +get_appname()-> + %%todo + <<"hello-world">>. +get_pid()-> + %%todo + <<"68901">>. +get_refinfo()-> + %%todo + <<"application%3Dhello-world%26default.check%3Dfalse%26default.lazy%3Dfalse%26default.retries%3D0%26default.sticky%3Dfalse%26default.timeout%3D300000%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.erlang.sample.service.facade.UserOperator%26lazy%3Dfalse%26methods%3DqueryUserInfo%2CqueryUserList%2CgenUserId%2CgetUserInfo%26pid%3D68901%26register.ip%3D127..0.1%26release%3D2.7.1%26retries%3D0%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1559727789953">>. + +get_registry_type()-> + %%todo + <<"zookeeper">>. \ No newline at end of file diff --git a/src/dubbo_common_fun.erl b/src/dubbo_registry.erl similarity index 50% copy from src/dubbo_common_fun.erl copy to src/dubbo_registry.erl index 6744171..89eb64b 100644 --- a/src/dubbo_common_fun.erl +++ b/src/dubbo_registry.erl @@ -14,18 +14,34 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%------------------------------------------------------------------------------ --module(dubbo_common_fun). +-module(dubbo_registry). +-include("dubboerl.hrl"). + +-callback start(Url :: binary) -> ok. +-callback register(Url::binary())-> term(). +-callback subscribe(SubcribeUrl::binary(),NotifyFun::function())->ok. %% API --export([local_ip_v4/0, local_ip_v4_str/0]). +-export([setup_register/1,register/2]). + +-spec(setup_register(UrlInfo :: map()) -> {ok, RegistryProcessName :: atom()}|{error, term()}). +setup_register(UrlInfo) -> + RegistryModuleName = get_registry_module(UrlInfo), + case whereis(RegistryModuleName) of + undefined -> + apply(RegistryModuleName, start, [UrlInfo]), + {ok, RegistryModuleName}; + _ -> + {ok, RegistryModuleName} + end. + +register(RegistryName,Url) -> + logger:info("call ~p register url ~p",[RegistryName,Url]), + Result = apply(RegistryName,register,[Url]), + Result. -local_ip_v4() -> - {ok, Addrs} = inet:getifaddrs(), - hd([ - Addr || {_, Opts} <- Addrs, {addr, Addr} <- Opts, - size(Addr) == 4, Addr =/= {127, 0, 0, 1} - ]). -local_ip_v4_str() -> - {V1, V2, V3, V4} = local_ip_v4(), - list_to_binary(io_lib:format("~p.~p.~p.~p", [V1, V2, V3, V4])). +get_registry_module(Info) -> + RegistryName = Info#dubbo_url.scheme, + FullName = << <<"dubbo_registry_">>, RegistryName/binary>>, + binary_to_existing_atom(FullName). \ No newline at end of file diff --git a/src/dubbo_registry_zookeeper.erl b/src/dubbo_registry_zookeeper.erl new file mode 100644 index 0000000..ea9ef62 --- /dev/null +++ b/src/dubbo_registry_zookeeper.erl @@ -0,0 +1,314 @@ +%%------------------------------------------------------------------------------ +%% Licensed to the Apache Software Foundation (ASF) under one or more +%% contributor license agreements. See the NOTICE file distributed with +%% this work for additional information regarding copyright ownership. +%% The ASF licenses this file to You under the Apache License, Version 2.0 +%% (the "License"); you may not use this file except in compliance with +%% the License. You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%------------------------------------------------------------------------------ +-module(dubbo_registry_zookeeper). +-behaviour(gen_server). +-behaviour(dubbo_registry). + +-include("dubbo.hrl"). +-include("dubboerl.hrl"). +%% API +-export([start_link/0, register_consumer/1, register_consumer/2, gen_consumer_node_info/1, register_provider/1, provider_watcher/1]). + +-export([start/1,register/1,subscribe/2]). +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, {zk_pid,notify_fun}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @end +%%-------------------------------------------------------------------- +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @end +%%-------------------------------------------------------------------- +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, Pid} = connection(), + {ok, #state{zk_pid = Pid}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% +%% @end +%%-------------------------------------------------------------------- +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). + +handle_call({add_consumer, Interface,ConsumerUrl}, _From, State) -> + add_consumer(Interface,ConsumerUrl, State), + {reply, ok, State}; +handle_call({add_provider, Provider}, _From, State) -> + register_provider_path(Provider, State), + {reply, ok, State}; +handle_call({subscribe_provider,InterfaceName,NotifyFun}, _From, #state{zk_pid = ZkPid} = State) -> + NewState=State#state{notify_fun = NotifyFun}, + get_provider_list(InterfaceName,ZkPid,NotifyFun), + {reply, ok, NewState}; + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% +%% @end +%%-------------------------------------------------------------------- +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({provider_node_change, Interface, Path}, #state{zk_pid = Pid} = State) -> + get_provider_and_start(Pid, Interface, Path), + {noreply, State}; +handle_cast(_Request, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State) -> + logger:info("zk server recv msg:~p", [_Info]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + + +%%---------------------------------------------- +%% dubbo_registry +%%---------------------------------------------- +start(Url) -> + ok. +register(Url)-> + {ok,UrlInfo} = dubbo_common_fun:parse_url(Url), + InterfaceName = maps:get(<<"interface">>,UrlInfo#dubbo_url.parameters), + register(UrlInfo#dubbo_url.scheme,InterfaceName,Url), + ok. + +register(<<"consumer">>,InterfaceName,Url)-> + gen_server:call(?SERVER, {add_consumer,InterfaceName, Url}), + ok; +register(<<"provider">>,InterfaceName,Url)-> + + ok. + +subscribe(SubcribeUrl,NotifyFun)-> + {ok,UrlInfo} = dubbo_common_fun:parse_url(SubcribeUrl), + InterfaceName = maps:get(<<"interface">>,UrlInfo#dubbo_url.parameters), + try gen_server:call(?SERVER,{subscribe_provider,InterfaceName,NotifyFun},5000) of + ok-> + ok + catch + Error:Reason-> + %%todo improve error type + {error,Reason} + end. + +register_consumer(Consumer) -> + gen_server:call(?SERVER, {add_consumer, Consumer}), + ok. +register_consumer(Name, Option) -> + Consumer = #consumer_config{interface = Name, methods = [<<"testa">>, <<"testb">>]}, + register_consumer(Consumer), + ok. +register_provider(Provider) -> + gen_server:call(?SERVER, {add_provider, Provider}), + ok. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +connection() -> + {ok, List} = application:get_env(dubboerl, zookeeper_list), + {ok, Pid} = erlzk:connect(List, 30000, [ + {chroot, "/"}, + {monitor, self()}]), + {ok, Pid}. + +add_consumer(InterfaceName,ConsumerUrl, State) -> + Pid = State#state.zk_pid, +%% ConsumerNode = gen_consumer_node_info(Consumer), + ConsumerNode2 = list_to_binary(edoc_lib:escape_uri(binary_to_list(ConsumerUrl))), + check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {InterfaceName, p}, {<<"consumers">>, p}, {ConsumerNode2, e}]), + %% todo +%% get_provider_list(Consumer, State), + ok. +register_provider_path(Provider, State) -> + #state{zk_pid = Pid} = State, + ProviderNode = dubbo_node_config_util:gen_provider_info(Provider), + check_and_create_path(Pid, <<"">>, [{<<"dubbo">>, p}, {Provider#provider_config.interface, p}, {<<"providers">>, p}, {ProviderNode, e}]), + ok. + + +get_provider_list(InterfaceName,ZkPid,NotifyFun) -> + InterfacePath = <<<<"/dubbo/">>/binary, InterfaceName/binary, <<"/providers">>/binary>>, + ChildList= get_provider_and_start(ZkPid, InterfaceName, InterfacePath), + NotifyFun(ChildList), + ok. +get_provider_and_start(Pid, Interface, Path) -> + case erlzk:get_children(Pid, Path, spawn(dubbo_registry_zookeeper, provider_watcher, [Interface])) of + {ok, ChildList} -> + logger:debug("get provider list ~p", [ChildList]), +%% start_provider_process(Interface, ChildList), + ChildList; + {error, R1} -> + logger:debug("[add_consumer] get_provider_list error ~p ~p", [R1]), + [] + end. + +provider_watcher(Interface) -> + receive + {node_children_changed, Path} -> + gen_server:cast(?SERVER, {provider_node_change, Interface, Path}), + logger:debug("provider_watcher get event ~p ~p", [node_children_changed, Path]); + {Event, Path} -> +%% Path = "/a", +%% Event = node_created + logger:debug("provider_watcher get event ~p ~p", [Event, Path]) + end, + ok. + + +create_path(Pid, Path, CreateType) -> + case erlzk:create(Pid, Path, CreateType) of + {ok, ActualPath} -> + logger:debug("[add_consumer] create zk path success ~p", [ActualPath]), + ok; + {error, R1} -> + logger:debug("[add_consumer] create zk path error ~p ~p", [Path, R1]) + end, + ok. +check_and_create_path(_Pid, _RootPath, []) -> + ok; +check_and_create_path(Pid, RootPath, [{Item, CreateType} | Rst]) -> + CheckPath = <<RootPath/binary, <<"/">>/binary, Item/binary>>, + case erlzk:exists(Pid, CheckPath) of + {ok, Stat} -> + check_and_create_path(Pid, CheckPath, Rst); + {error, no_node} -> + logger:debug("[add_consumer] check_and_create_path unexist no_node ~p", [CheckPath]), + create_path(Pid, CheckPath, CreateType), + check_and_create_path(Pid, CheckPath, Rst); + {error, R1} -> + logger:debug("[add_consumer] check_and_create_path unexist ~p", [R1]), + check_and_create_path(Pid, CheckPath, Rst) + end. + +gen_consumer_node_info(Consumer) -> + %% revision参数字段的作用是什么? 暂时不添加 + Methods = dubbo_lists_util:join(Consumer#consumer_config.methods, <<",">>), + Value = io_lib:format(<<"consumer://~s/~s?application=~s&category=~s&check=~p&default.timeout=~p&dubbo=~s&interface=~s&methods=~s&side=~s×tamp=~p">>, + [dubbo_common_fun:local_ip_v4_str(), + Consumer#consumer_config.interface, + Consumer#consumer_config.application, + Consumer#consumer_config.category, + Consumer#consumer_config.check, + Consumer#consumer_config.default_timeout, + Consumer#consumer_config.dubbo_version, + Consumer#consumer_config.interface, + Methods, + Consumer#consumer_config.side, + dubbo_time_util:timestamp_ms() + ]), + list_to_binary(Value). + +%%dubbo_zookeeper:register_consumer(<<"com.ifcoder.abcd">>,[]). +start_provider_process(Interface, ProviderList) -> + dubbo_consumer_pool:start_consumer(Interface, ProviderList). \ No newline at end of file diff --git a/test/userOperator.erl b/test/userOperator.erl index 0d35917..e8567ae 100644 --- a/test/userOperator.erl +++ b/test/userOperator.erl @@ -67,7 +67,7 @@ getUserInfo(Arg0, RequestOption)-> ] }, Request = dubbo_adapter:reference(Data), - dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption). + dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption). -spec genUserId()-> @@ -96,7 +96,7 @@ genUserId( RequestOption)-> ] }, Request = dubbo_adapter:reference(Data), - dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption). + dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption). -spec queryUserInfo(Arg0::#userInfoRequest{})-> @@ -127,7 +127,7 @@ queryUserInfo(Arg0, RequestOption)-> ] }, Request = dubbo_adapter:reference(Data), - dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption). + dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption). -spec queryUserList(Arg0::list())-> @@ -158,7 +158,7 @@ queryUserList(Arg0, RequestOption)-> ] }, Request = dubbo_adapter:reference(Data), - dubbo_invoker:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption). + dubbo_invoker_old:invoke_request(?CURRENT_CLASS_NAME,Request,RequestOption). test() -> queryUserInfo(#userInfoRequest{username = "name", requestId = "111"}, #{sync=> true}). \ No newline at end of file
