This is an automated email from the ASF dual-hosted git repository. dlive pushed a commit to branch 0.4.0 in repository https://gitbox.apache.org/repos/asf/dubbo-erlang.git
commit 216b5a7bffebcdeb922bc26bcfa96fca270290db Author: DLive <[email protected]> AuthorDate: Sat Jun 22 23:39:18 2019 +0800 dev reference ref process --- include/dubbo.hrl | 4 +- include/dubboerl.hrl | 10 +++-- src/dubbo_directory.erl | 29 ++++++++++++- src/dubbo_exchanger.erl | 18 +++----- src/dubbo_protocol_dubbo.erl | 69 +++++++++++++++++-------------- src/dubbo_provider_consumer_reg_table.erl | 45 ++++++++++++-------- 6 files changed, 107 insertions(+), 68 deletions(-) diff --git a/include/dubbo.hrl b/include/dubbo.hrl index 727c71e..f2d4048 100644 --- a/include/dubbo.hrl +++ b/include/dubbo.hrl @@ -102,8 +102,8 @@ -record(interface_list, {interface, pid, connection_info}). --record(provider_node_list, {host_flag, pid, weight, readonly = false}). --record(connection_info, {connection_id, pid, weight, host_flag, readonly = false}). +%%-record(provider_node_list, {host_flag, pid, weight, readonly = false}). +-record(connection_info, {host_flag, pid, weight, readonly = false}). -type dubbo_request() :: #dubbo_request{}. -type dubbo_response() :: #dubbo_response{}. \ No newline at end of file diff --git a/include/dubboerl.hrl b/include/dubboerl.hrl index ce6baf1..a9be5f1 100644 --- a/include/dubboerl.hrl +++ b/include/dubboerl.hrl @@ -14,11 +14,13 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%------------------------------------------------------------------------------ --define(PROVIDER_IMPL_TABLE,provider_impl_table). +-define(PROVIDER_IMPL_TABLE, provider_impl_table). --define(PROVIDER_WORKER,provider_worker). +-define(PROVIDER_WORKER, provider_worker). --define(TRAFFIC_CONTROL,traffic_control). +-define(TRAFFIC_CONTROL, traffic_control). --record(dubbo_url,{scheme,user_info,host,port,path,parameters,fragment}). \ No newline at end of file +-record(dubbo_url, {scheme, user_info, host, port, path, parameters, fragment}). + +-record(dubbo_invoker, {host_flag, handle}). \ No newline at end of file diff --git a/src/dubbo_directory.erl b/src/dubbo_directory.erl index 3674a91..07a6dff 100644 --- a/src/dubbo_directory.erl +++ b/src/dubbo_directory.erl @@ -17,7 +17,7 @@ -module(dubbo_directory). -behaviour(gen_server). - +-include("dubboerl.hrl"). -export([subscribe/2,notify/2]). %% API -export([start_link/0]). @@ -87,8 +87,23 @@ notify(Interface,UrlList)-> %% dubbo_consumer_pool:start_consumer(Interface, UrlList), ok. + refresh_invoker(UrlList)-> - NewInvokers = refresh_invoker(UrlList,[]). + case pick_interface(UrlList) of + {error,Reason}-> + fail; + {"empty",Interface}-> + todo_destroy; + {_,Interface} -> + OldProviderHosts = dubbo_provider_consumer_reg_table:get_interface_provider_node(Interface), + NewInvokers = refresh_invoker(UrlList,[]), + NewProviderHosts = [Item#dubbo_invoker.host_flag || Item <- NewInvokers], + DeleteProverList = OldProviderHosts -- NewProviderHosts, + dubbo_provider_consumer_reg_table:clean_invalid_provider(DeleteProverList) + + end. +%% OldProviderHosts = + refresh_invoker([Url|Rest],Acc)-> case dubbo_extension:run_fold(protocol,refer,[Url],undefined) of @@ -100,6 +115,16 @@ refresh_invoker([Url|Rest],Acc)-> refresh_invoker(Rest,Acc) end. +pick_interface([Url | _]) -> + case dubbo_common_fun:parse_url(Url) of + {ok,UrlInfo}-> + Interface = maps:get("interface",UrlInfo#dubbo_url.parameters), + {UrlInfo#dubbo_url.scheme,Interface}; + {error,Reason} -> + {error,Reason} + end. + + %%-------------------------------------------------------------------- %% @private %% @doc diff --git a/src/dubbo_exchanger.erl b/src/dubbo_exchanger.erl index 09a4833..6fb8496 100644 --- a/src/dubbo_exchanger.erl +++ b/src/dubbo_exchanger.erl @@ -21,20 +21,14 @@ %% API -export([connect/2]). -connect(Url,Handler) -> - case dubbo_node_config_util:parse_provider_info(Url) of - {ok, ProviderConfig} -> - HostFlag= dubbo_provider_consumer_reg_table:get_host_flag(ProviderConfig), - {ok, Pid} = dubbo_transport_pool_sup:add_children(ProviderConfig,Handler), - logger:info("start provider ~p pid info ~p~n", [HostFlag, Pid]), - {ok,#connection_info{ pid = Pid, weight = get_weight(ProviderConfig), host_flag = HostFlag}}; - {error, R1} -> - logger:error("parse provider info error reason ~p", [R1]), - {error,R1} - end. +connect(ProviderConfig, Handler) -> + HostFlag = dubbo_provider_consumer_reg_table:get_host_flag(ProviderConfig), + {ok, Pid} = dubbo_transport_pool_sup:add_children(ProviderConfig, Handler), + logger:info("start provider ~p pid info ~p~n", [HostFlag, Pid]), + {ok, #connection_info{pid = Pid, weight = get_weight(ProviderConfig), host_flag = HostFlag}}. -get_weight(_ProviderConfig)-> +get_weight(_ProviderConfig) -> %% todo get weight from provider info 30. \ No newline at end of file diff --git a/src/dubbo_protocol_dubbo.erl b/src/dubbo_protocol_dubbo.erl index 1ede1d8..96947cc 100644 --- a/src/dubbo_protocol_dubbo.erl +++ b/src/dubbo_protocol_dubbo.erl @@ -26,50 +26,57 @@ refer(Url, Acc) -> {ok, UrlInfo} = dubbo_common_fun:parse_url(Url), case UrlInfo#dubbo_url.scheme of <<"dubbo">> -> - do_refer(UrlInfo), - {ok, todo}; + {ok,Invoker} = do_refer(UrlInfo), + {ok, Invoker}; _ -> {skip, Acc} end. do_refer(UrlInfo) -> - - ok. - - -getClients(ProviderUrl) -> - case new_transport(ProviderUrl) of - {ok,ConnectionInfoList} -> - ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig), - ok; - {error,Reason} -> - {error,Reason} + case dubbo_node_config_util:parse_provider_info(UrlInfo) of + {ok, ProviderConfig} -> +%% OldHostList = dubbo_provider_consumer_reg_table:get_interface_provider_node(ProviderConfig#provider_config.interface), + case getClients(ProviderConfig) of + {ok, ConnectionInfoList} -> + dubbo_provider_consumer_reg_table:update_node_conections(ProviderConfig#provider_config.interface,ConnectionInfoList), + HostFlag = dubbo_provider_consumer_reg_table:get_host_flag(ProviderConfig), + {ok,#dubbo_invoker{host_flag = HostFlag,handle = ?MODULE}}; + {error, Reason} -> + {error, Reason} + end; + {error, R1} -> + logger:error("parse provider info error reason ~p", [R1]), + {error, R1} end. +getClients(ProviderConfig) -> + %% @todo if connections parameter > 1, need new spec transport + case new_transport(ProviderConfig) of + {ok, ConnectionInfoList} -> +%% ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig), + {ok, ConnectionInfoList}; + {error, Reason} -> + {error, Reason} + end. %%ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, ConnectionList, true), -new_transport(ProviderUrl)-> - case dubbo_node_config_util:parse_provider_info(ProviderUrl) of - {ok, ProviderConfig} -> - HostFlag = get_host_flag(ProviderConfig), - case dubbo_provider_consumer_reg_table:get_host_connections(ProviderConfig#provider_config) of - [] -> - case dubbo_exchanger:connect(ProviderUrl,?MODULE) of - {ok,ConnectionInfo} -> - {ok,[ConnectionInfo]}; - {error,Reason} -> - logger:warning("start client fail ~p ~p",[Reason,HostFlag]), - {error,Reason} - end; - ConnectionInfoList -> - {ok,ConnectionInfoList} +new_transport(ProviderConfig) -> + + HostFlag = get_host_flag(ProviderConfig), + case dubbo_provider_consumer_reg_table:get_host_connections(ProviderConfig#provider_config) of + [] -> + case dubbo_exchanger:connect(ProviderConfig, ?MODULE) of + {ok, ConnectionInfo} -> + {ok, [ConnectionInfo]}; + {error, Reason} -> + logger:warning("start client fail ~p ~p", [Reason, HostFlag]), + {error, Reason} end; - {error, R1} -> - logger:error("parse provider info error reason ~p", [R1]), - {error,R1} + ConnectionInfoList -> + {ok, ConnectionInfoList} end. diff --git a/src/dubbo_provider_consumer_reg_table.erl b/src/dubbo_provider_consumer_reg_table.erl index c7a8dfa..d47260d 100644 --- a/src/dubbo_provider_consumer_reg_table.erl +++ b/src/dubbo_provider_consumer_reg_table.erl @@ -29,7 +29,8 @@ terminate/2, code_change/3]). --export([update_consumer_connections/2,get_host_connections/2, select_connection/1, select_connection/2, update_connection_readonly/2, get_host_flag/1, get_host_flag/2]). +-export([update_consumer_connections/2,update_node_conections/2,get_interface_provider_node/1,get_host_connections/2, select_connection/1, + select_connection/2, update_connection_readonly/2, get_host_flag/1, get_host_flag/2,clean_invalid_provider/1]). -include("dubbo.hrl"). -define(SERVER, ?MODULE). @@ -193,11 +194,7 @@ start_consumer(Interface, ProviderNodeInfo) -> get_host_connections(Host, Port) -> HostFlag = get_host_flag(Host, Port), List = ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag), - List2 = lists:map( - fun(#provider_node_list{host_flag = HostFlag,pid = Pid,readonly = Readonly}) -> - #connection_info{host_flag = HostFlag,pid = Pid,readonly = Readonly} - end, List), - List2. + List. %%%=================================================================== @@ -239,14 +236,29 @@ get_host_connections(Host, Port) -> %% end, ExecutesList), %% ConnectionList. + +update_node_conections(HostFlag,Connections)-> + lists:map( + fun(Item) -> + HostFlag= Item#connection_info.host_flag, + case ets:lookup_element(?PROVIDER_NODE_LIST_TABLE,#connection_info{host_flag = HostFlag,pid = Item#connection_info.pid,_="_"}) of + '$end_of_table' -> + I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, Item), + logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]); + _ -> + ok + end + end, Connections), + ok. + update_consumer_connections(Interface, Connections) -> lists:map( fun(Item) -> HostFlag= Item#connection_info.host_flag, - case ets:lookup_element(?PROVIDER_NODE_LIST_TABLE,#provider_node_list{host_flag = HostFlag,pid = Item#connection_info.pid,_="_"}) of + case ets:lookup_element(?PROVIDER_NODE_LIST_TABLE,#connection_info{host_flag = HostFlag,pid = Item#connection_info.pid,_="_"}) of '$end_of_table' -> - I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, #provider_node_list{host_flag = HostFlag,pid = Item#connection_info.pid}), + I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, Item), logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]); {_ObjectList,_Continuation} -> ok @@ -269,7 +281,7 @@ update_connection_info(Interface, HostFlag, ConnectionList, IsUpdateProvideNode) logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]), case IsUpdateProvideNode of true -> - I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, #provider_node_list{host_flag = HostFlag, connection_info = Item}), + I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, Item), logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]); false -> ok @@ -320,18 +332,17 @@ clean_invalid_provider([HostFlag | DeleteProverList]) -> case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of [] -> ok; - ProviderNodeList -> - ProviderNodeList1 = dubbo_lists_util:del_duplicate(ProviderNodeList), - clean_connection_info(ProviderNodeList1) + ProviderNodeConnections -> + ProviderNodeConnections1 = dubbo_lists_util:del_duplicate(ProviderNodeConnections), + clean_connection_info(ProviderNodeConnections1) end, clean_invalid_provider(DeleteProverList). -clean_connection_info(ProviderNodeList) -> +clean_connection_info(ProviderNodeConnections) -> lists:map(fun(Item) -> - Pid = Item#provider_node_list.connection_info#connection_info.pid, - ConnectionId = Item#provider_node_list.connection_info#connection_info.connection_id, + Pid = Item#connection_info.pid, Pattern = #interface_list{pid = Pid, _ = '_'}, ets:delete_object(?INTERFCE_LIST_TABLE, Pattern), - dubbo_transport_pool_sup:stop_children(ConnectionId) - end, ProviderNodeList), + dubbo_transport_pool_sup:stop_children(Pid) + end, ProviderNodeConnections), ok. \ No newline at end of file
