fixeria has uploaded this change for review. ( https://gerrit.osmocom.org/c/erlang/osmo-s1gw/+/41034?usp=email )
Change subject: enb_registry: new module ...................................................................... enb_registry: new module The purpose of this new module is to maintain a centralized eNB registry, that can be easily queried via the upcoming REST interface. This avoids the need to fetch the eNB connection list from the sctp_server process and then query the individual processes handling each connection. Change-Id: I77a5a750ca6342da3a99926a44926b3973ab19c4 Related: SYS#7066 --- A src/enb_registry.erl M src/osmo_s1gw_sup.erl M src/s1ap_proxy.erl M src/sctp_proxy.erl M src/sctp_server.erl M test/s1ap_proxy_test.erl 6 files changed, 377 insertions(+), 34 deletions(-) git pull ssh://gerrit.osmocom.org:29418/erlang/osmo-s1gw refs/changes/34/41034/1 diff --git a/src/enb_registry.erl b/src/enb_registry.erl new file mode 100644 index 0000000..edd9f3d --- /dev/null +++ b/src/enb_registry.erl @@ -0,0 +1,308 @@ +%% Copyright (C) 2025 by sysmocom - s.f.m.c. GmbH <i...@sysmocom.de> +%% Author: Vadim Yanitskiy <vyanits...@sysmocom.de> +%% +%% All Rights Reserved +%% +%% SPDX-License-Identifier: AGPL-3.0-or-later +%% +%% This program is free software; you can redistribute it and/or modify +%% it under the terms of the GNU Affero General Public License as +%% published by the Free Software Foundation; either version 3 of the +%% License, or (at your option) any later version. +%% +%% This program is distributed in the hope that it will be useful, +%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +%% GNU General Public License for more details. +%% +%% You should have received a copy of the GNU Affero General Public License +%% along with this program. If not, see <https://www.gnu.org/licenses/>. +%% +%% Additional Permission under GNU AGPL version 3 section 7: +%% +%% If you modify this Program, or any covered work, by linking or +%% combining it with runtime libraries of Erlang/OTP as released by +%% Ericsson on https://www.erlang.org (or a modified version of these +%% libraries), containing parts covered by the terms of the Erlang Public +%% License (https://www.erlang.org/EPLICENSE), the licensors of this +%% Program grant you additional permission to convey the resulting work +%% without the need to license the runtime libraries of Erlang/OTP under +%% the GNU Affero General Public License. Corresponding Source for a +%% non-source form of such a combination shall include the source code +%% for the parts of the runtime libraries of Erlang/OTP used as well as +%% that of the covered work. + +-module(enb_registry). +-behaviour(gen_server). + +-export([init/1, + handle_info/2, + handle_call/3, + handle_cast/2, + terminate/2]). +-export([start_link/0, + enb_register/0, + enb_unregister/1, + enb_event/2, + fetch_enb_info/1, + fetch_enb_list/0, + fetch_enb_list/1, + shutdown/0]). + +-include_lib("kernel/include/logger.hrl"). + + +-type enb_handle() :: non_neg_integer(). + +-type enb_state() :: connecting %% S1GW -> MME connection in progress + | connected %% S1GW -> MME connection established + | s1setup. %% S1 SETUP procedure completed + +-type enb_event() :: {connecting, sctp_server:conn_info()} + | {connected, sctp_proxy:conn_info()} + | {s1setup, s1ap_proxy:enb_info()}. + +-type enb_filter() :: {genb_id_str, string()} + | {enb_sctp_aid, gen_sctp:assoc_id()} + | {mme_sctp_aid, gen_sctp:assoc_id()} + | {enb_addr_port, sctp_server:addr_port()} + | {mme_addr_port, sctp_server:addr_port()}. + +-type enb_info() :: #{pid := pid(), %% pid() of the registrant + mon_ref := reference(), %% monitor() reference + state := enb_state(), %% connection state + reg_time := integer(), %% registration time (monotonic) + genb_id_str => string(), %% Global-eNB-ID + enb_id => s1ap_proxy:enb_id(), %% eNB-ID + plmn_id => s1ap_proxy:plmn_id(), %% PLMN-ID + enb_conn_info => sctp_server:conn_info(), %% eNB -> S1GW connection info + mme_conn_info => sctp_proxy:conn_info() %% S1GW -> MME connection info + }. + +-record(state, {enbs :: #{enb_handle() => enb_info()}, + pids :: #{pid() => enb_handle()}, + next_handle :: enb_handle() + }). + +-export_type([enb_handle/0, + enb_state/0, + enb_info/0]). + + +%% ------------------------------------------------------------------ +%% public API +%% ------------------------------------------------------------------ + +-spec start_link() -> {ok, pid()} | term(). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +-spec enb_register() -> {ok, enb_handle()} | {error, term()}. +enb_register() -> + gen_server:call(?MODULE, ?FUNCTION_NAME). + + +-spec enb_unregister(enb_handle()) -> ok | {error, term()}. +enb_unregister(Handle) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, Handle}). + + +-spec enb_event(enb_handle(), enb_event()) -> ok. +enb_event(Handle, Event) -> + gen_server:cast(?MODULE, {?FUNCTION_NAME, Handle, Event}). + + +-spec fetch_enb_info(enb_handle()) -> {ok, enb_info()} | error. +fetch_enb_info(Handle) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, Handle}). + + +-spec fetch_enb_list() -> [{Handle, EnbInfo}] + when Handle :: enb_handle(), + EnbInfo :: enb_info(). +fetch_enb_list() -> + gen_server:call(?MODULE, ?FUNCTION_NAME). + +-spec fetch_enb_list(Filter) -> [{Handle, EnbInfo}] + when Filter :: enb_filter(), + Handle :: enb_handle(), + EnbInfo :: enb_info(). +fetch_enb_list(Filter) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, Filter}). + + +-spec shutdown() -> ok. +shutdown() -> + gen_server:stop(?MODULE). + + +%% ------------------------------------------------------------------ +%% gen_server API +%% ------------------------------------------------------------------ + +init([]) -> + {ok, #state{enbs = maps:new(), + pids = maps:new(), + next_handle = 0}}. + + +handle_call(enb_register, + {Pid, _Tag}, + #state{enbs = ENBs, + pids = PIDs, + next_handle = Handle} = S) -> + case maps:find(Pid, PIDs) of + {ok, Handle} -> + ?LOG_ERROR("eNB (handle=~p, pid=~p) is *already* registered", [Handle, Pid]), + {reply, {error, enb_not_registered}, S}; + error -> + %% keep an eye on the process being registered + MonRef = erlang:monitor(process, Pid), + %% create and store an initial eNB state + EnbInfo = #{pid => Pid, + mon_ref => MonRef, + state => connecting, + reg_time => erlang:monotonic_time()}, + ?LOG_INFO("eNB (handle=~p) registered", [Handle]), + {reply, {ok, Handle}, S#state{enbs = ENBs#{Handle => EnbInfo}, + pids = PIDs#{Pid => Handle}, + next_handle = Handle + 1}} + end; + +handle_call({enb_unregister, Handle}, + _From, + #state{enbs = ENBs, + pids = PIDs} = S) -> + case maps:find(Handle, ENBs) of + {ok, #{pid := Pid, + mon_ref := MonRef}} -> + erlang:demonitor(MonRef, [flush]), + ?LOG_INFO("eNB (handle=~p) unregistered", [Handle]), + {reply, ok, S#state{enbs = maps:remove(Handle, ENBs), + pids = maps:remove(Pid, PIDs)}}; + error -> + ?LOG_ERROR("eNB (handle=~p) is *not* registered", [Handle]), + {reply, {error, enb_not_registered}, S} + end; + +handle_call({fetch_enb_info, Handle}, + _From, + #state{enbs = ENBs} = S) -> + Reply = maps:find(Handle, ENBs), + {reply, Reply, S}; + +handle_call(fetch_enb_list, + _From, + #state{enbs = ENBs} = S) -> + Reply = maps:to_list(ENBs), + {reply, Reply, S}; + +handle_call({fetch_enb_list, Filter}, + _From, + #state{enbs = ENBs} = S) -> + Filtered = maps:filter(enb_filter(Filter), ENBs), + Reply = maps:to_list(Filtered), + {reply, Reply, S}; + +handle_call(Info, From, S) -> + ?LOG_ERROR("unknown ~p() from ~p: ~p", [?FUNCTION_NAME, From, Info]), + {reply, {error, not_implemented}, S}. + + +handle_cast({enb_event, Handle, Event}, + #state{enbs = ENBs} = S) -> + case maps:find(Handle, ENBs) of + {ok, EnbInfo0} -> + ?LOG_INFO("eNB (handle=~p) event: ~p", [Handle, Event]), + EnbInfo1 = enb_handle_event(EnbInfo0, Event), + {noreply, S#state{enbs = maps:update(Handle, EnbInfo1, ENBs)}}; + error -> + ?LOG_ERROR("eNB (handle=~p) is *not* registered", [Handle]), + {noreply, S} + end; + +handle_cast(Info, S) -> + ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]), + {noreply, S}. + + +handle_info({'DOWN', _MonRef, process, Pid, Reason}, + #state{enbs = ENBs, + pids = PIDs} = S) -> + ?LOG_INFO("eNB process (pid=~p) terminated with reason ~p", [Pid, Reason]), + case maps:find(Pid, PIDs) of + {ok, Pid} -> + Handle = maps:get(Pid, PIDs), + ?LOG_INFO("eNB (handle=~p) has been unregistered", [Handle]), + {noreply, S#state{enbs = maps:remove(Handle, ENBs), + pids = maps:remove(Pid, PIDs)}}; + error -> + ?LOG_ERROR("eNB (pid=~p) is *not* registered", [Pid]), + {noreply, S} + end; + +handle_info(Info, S) -> + ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]), + {noreply, S}. + + +terminate(Reason, _S) -> + ?LOG_NOTICE("Terminating, reason ~p", [Reason]), + ok. + + +%% ------------------------------------------------------------------ +%% private API +%% ------------------------------------------------------------------ + +-spec enb_handle_event(enb_info(), enb_event()) -> enb_info(). +enb_handle_event(EnbInfo, {connecting, ConnInfo}) -> + EnbInfo#{state => connecting, + enb_conn_info => ConnInfo}; + +enb_handle_event(EnbInfo, {connected, ConnInfo}) -> + EnbInfo#{state => connected, + mme_conn_info => ConnInfo}; + +enb_handle_event(EnbInfo, {s1setup, Info}) -> + maps:merge(EnbInfo#{state => s1setup}, Info); + +enb_handle_event(EnbInfo, Event) -> + ?LOG_ERROR("Unhandled event: ~p", [Event]), + EnbInfo. + + +-spec enb_filter(enb_filter()) -> fun((enb_handle(), enb_info()) -> boolean()). +enb_filter({genb_id_str, GlobalENBId}) -> + fun(_, Item) -> enb_filter_by_field({genb_id_str, GlobalENBId}, Item) end; + +enb_filter({enb_sctp_aid, Aid}) -> + fun(_, Item) -> enb_filter_by_sub_field({enb_conn_info, aid, Aid}, Item) end; + +enb_filter({mme_sctp_aid, Aid}) -> + fun(_, Item) -> enb_filter_by_sub_field({mme_conn_info, mme_aid, Aid}, Item) end; + +enb_filter({enb_addr_port, {Addr, Port}}) -> + fun(_, Item) -> enb_filter_by_sub_field({enb_conn_info, addr, Addr}, Item), + enb_filter_by_sub_field({enb_conn_info, port, Port}, Item) end; + +enb_filter({mme_addr_port, {Addr, Port}}) -> + fun(_, Item) -> enb_filter_by_sub_field({mme_conn_info, mme_addr, Addr}, Item), + enb_filter_by_sub_field({mme_conn_info, mme_port, Port}, Item) end; + +enb_filter(Filter) -> + ?LOG_ERROR("Unknown eNB filter: ~p", [Filter]), + fun(_, _) -> false end. + + +enb_filter_by_field({Field, Value}, EnbInfo) -> + maps:get(Field, EnbInfo, undefined) =:= Value. + + +enb_filter_by_sub_field({Map, Field, Value}, EnbInfo) -> + M = maps:get(Map, EnbInfo, #{}), + maps:get(Field, M, undefined) =:= Value. + + +%% vim:set ts=4 sw=4 et: diff --git a/src/osmo_s1gw_sup.erl b/src/osmo_s1gw_sup.erl index 50bd682..934f040 100644 --- a/src/osmo_s1gw_sup.erl +++ b/src/osmo_s1gw_sup.erl @@ -66,6 +66,11 @@ PfcpLocAddr = osmo_s1gw:get_env(pfcp_loc_addr, ?ENV_DEFAULT_PFCP_LOC_ADDR), PfcpRemAddr = osmo_s1gw:get_env(pfcp_rem_addr, ?ENV_DEFAULT_PFCP_REM_ADDR), + EnbRegistry = {enb_registry, {enb_registry, start_link, []}, + permanent, + 5000, + worker, + [enb_registry]}, SctpServer = {sctp_server, {sctp_server, start_link, [server_cfg()]}, permanent, 5000, @@ -84,7 +89,7 @@ [gtpu_kpi]}, s1gw_metrics:init(), - {ok, {{one_for_one, 5, 10}, [SctpServer, PfcpPeer, GtpuKpi]}}. + {ok, {{one_for_one, 5, 10}, [EnbRegistry, SctpServer, PfcpPeer, GtpuKpi]}}. %% ------------------------------------------------------------------ diff --git a/src/s1ap_proxy.erl b/src/s1ap_proxy.erl index a80ce0e..e41d1aa 100644 --- a/src/s1ap_proxy.erl +++ b/src/s1ap_proxy.erl @@ -40,7 +40,7 @@ handle_call/3, handle_cast/2, terminate/2]). --export([start_link/1, +-export([start_link/2, process_pdu/2, fetch_erab/2, fetch_erab_list/1, @@ -73,6 +73,7 @@ MNC :: nonempty_string()}. -record(proxy_state, {conn_info :: sctp_server:conn_info(), + enb_handle :: enb_registry:enb_handle(), erabs :: dict:dict(K :: erab_uid(), V :: pid()), enb_id :: undefined | non_neg_integer(), @@ -94,18 +95,21 @@ }. -export_type([proxy_action/0, - enb_info/0]). + enb_info/0, + enb_id/0, + plmn_id/0]). %% ------------------------------------------------------------------ %% public API %% ------------------------------------------------------------------ --spec start_link(ConnInfo) -> Result - when ConnInfo :: sctp_server:conn_info(), +-spec start_link(EnbHandle, ConnInfo) -> Result + when EnbHandle :: enb_registry:enb_handle(), + ConnInfo :: sctp_server:conn_info(), Result :: gen_server:start_ret(). -start_link(ConnInfo) -> - gen_server:start_link(?MODULE, [ConnInfo], []). +start_link(EnbHandle, ConnInfo) -> + gen_server:start_link(?MODULE, [EnbHandle, ConnInfo], []). -type process_pdu_result() :: {proxy_action(), binary()}. @@ -141,9 +145,10 @@ %% gen_server API %% ------------------------------------------------------------------ -init([ConnInfo]) -> +init([EnbHandle, ConnInfo]) -> process_flag(trap_exit, true), {ok, #proxy_state{enb_uptime = enb_uptime:start_link(), + enb_handle = EnbHandle, conn_info = ConnInfo, erabs = dict:new(), path = []}}. @@ -163,15 +168,8 @@ {reply, dict:to_list(ERABs), S}; handle_call(fetch_enb_info, _From, - #proxy_state{enb_id = EnbId, - plmn_id = PLMNId, - genb_id_str = GlobalENBId} = S) -> - Info = #{enb_id => EnbId, - plmn_id => PLMNId, - genb_id_str => GlobalENBId}, - %% omit fields with Value =:= undefined - Reply = maps:filter(fun(_K, V) -> V =/= undefined end, Info), - {reply, Reply, S}; + #proxy_state{} = S) -> + {reply, enb_info(S), S}; handle_call(Info, From, #proxy_state{} = S) -> @@ -267,6 +265,15 @@ MCC ++ "-" ++ MNC ++ "-" ++ integer_to_list(ENBId). +-spec enb_info(proxy_state()) -> enb_info(). +enb_info(S) -> + Info = #{enb_id => S#proxy_state.enb_id, + plmn_id => S#proxy_state.plmn_id, + genb_id_str => S#proxy_state.genb_id_str}, + %% omit fields with Value =:= undefined + maps:filter(fun(_K, V) -> V =/= undefined end, Info). + + %% register a single per-eNB counter -spec ctr_reg(C, GlobalENBId) -> C when C :: [ctr | _], @@ -397,6 +404,8 @@ handle_pdu({successfulOutcome, #'SuccessfulOutcome'{procedureCode = ?'id-S1Setup'}}, S) -> ?LOG_DEBUG("Processing S1 SETUP RESPONSE"), + enb_registry:enb_event(S#proxy_state.enb_handle, + {s1setup, enb_info(S)}), gtpu_kpi_enb_register(S), enb_uptime:genb_id_ind(S#proxy_state.enb_uptime, S#proxy_state.genb_id_str), diff --git a/src/sctp_proxy.erl b/src/sctp_proxy.erl index 830fe91..4ed3cb8 100644 --- a/src/sctp_proxy.erl +++ b/src/sctp_proxy.erl @@ -96,12 +96,15 @@ %% gen_statem API %% ------------------------------------------------------------------ -init([ConnInfo, ConnCfg]) -> - {ok, Pid} = s1ap_proxy:start_link(ConnInfo), +init([EnbConnInfo, MmeConnCfg]) -> + {ok, EnbHandle} = enb_registry:enb_register(), + {ok, Pid} = s1ap_proxy:start_link(EnbHandle, EnbConnInfo), {ok, connecting, - #{enb_aid => maps:get(aid, ConnInfo), - conn_cfg => ConnCfg, + #{enb_aid => maps:get(aid, EnbConnInfo), + enb_conn_info => EnbConnInfo, + mme_conn_cfg => MmeConnCfg, tx_queue => [], + enb_handle => EnbHandle, handler => Pid}}. @@ -111,10 +114,13 @@ %% CONNECTING state connecting(enter, OldState, - #{conn_cfg := ConnCfg} = S) -> + #{enb_conn_info := EnbConnInfo, + mme_conn_cfg := MmeConnCfg, + enb_handle := Handle} = S) -> ?LOG_INFO("State change: ~p -> ~p", [OldState, ?FUNCTION_NAME]), + ok = enb_registry:enb_event(Handle, {?FUNCTION_NAME, EnbConnInfo}), %% Initiate connection establishment with the MME - {ok, Sock} = sctp_client:connect(ConnCfg), + {ok, Sock} = sctp_client:connect(MmeConnCfg), {next_state, connecting, S#{sock => Sock}, [{state_timeout, 2_000, conn_est_timeout}]}; @@ -155,8 +161,11 @@ %% CONNECTED state -connected(enter, OldState, S0) -> +connected(enter, OldState, + #{enb_handle := Handle} = S0) -> ?LOG_INFO("State change: ~p -> ~p", [OldState, ?FUNCTION_NAME]), + MmeConnInfo = conn_info(?FUNCTION_NAME, S0), + ok = enb_registry:enb_event(Handle, {?FUNCTION_NAME, MmeConnInfo}), %% Send pending eNB -> MME messages (if any) S1 = sctp_send_pending(S0), {keep_state, S1}; @@ -216,13 +225,8 @@ %% Event handler for all states -handle_event(State, {call, From}, fetch_info, - #{handler := Pid, conn_cfg := ConnCfg} = S0) -> - S1 = maps:with([enb_aid, mme_aid], S0), - Info = S1#{state => State, - handler => Pid, - mme_addr => maps:get(raddr, ConnCfg), - mme_port => maps:get(rport, ConnCfg)}, +handle_event(State, {call, From}, fetch_info, S) -> + Info = conn_info(State, S), {keep_state_and_data, {reply, From, Info}}; handle_event(State, {call, From}, EventData, _S) -> @@ -238,8 +242,11 @@ {ok, State, S}. -terminate(Reason, State, #{handler := Pid} = S) -> +terminate(Reason, State, + #{handler := Pid, + enb_handle := Handle} = S) -> ?LOG_NOTICE("Terminating in state ~p, reason ~p", [State, Reason]), + enb_registry:enb_unregister(Handle), s1ap_proxy:shutdown(Pid), case S of #{sock := Sock, @@ -284,4 +291,12 @@ sctp_send_pending([], S) -> S#{tx_queue := []}. + +conn_info(State, #{mme_conn_cfg := MmeConnCfg} = S0) -> + S1 = maps:with([enb_aid, mme_aid, handler], S0), + S1#{state => State, + mme_addr => maps:get(raddr, MmeConnCfg), + mme_port => maps:get(rport, MmeConnCfg)}. + + %% vim:set ts=4 sw=4 et: diff --git a/src/sctp_server.erl b/src/sctp_server.erl index 5530677..d194ce4 100644 --- a/src/sctp_server.erl +++ b/src/sctp_server.erl @@ -68,7 +68,8 @@ }. -export_type([cfg/0, - conn_info/0]). + conn_info/0, + addr_port/0]). -record(client_state, {addr_port :: addr_port(), diff --git a/test/s1ap_proxy_test.erl b/test/s1ap_proxy_test.erl index 9015934..55e9c13 100644 --- a/test/s1ap_proxy_test.erl +++ b/test/s1ap_proxy_test.erl @@ -28,9 +28,13 @@ pfcp_mock:mock_all(), exometer:start(), s1gw_metrics:init(), + enb_registry:start_link(), gtpu_kpi:start_link(#{enable => false}), - {ok, Pid} = s1ap_proxy:start_link(#{addr => {127,0,0,0}, + {ok, EnbHandle} = enb_registry:enb_register(), + {ok, Pid} = s1ap_proxy:start_link(EnbHandle, + #{addr => {127,0,0,0}, port => 1337}), + ok = enb_registry:enb_unregister(EnbHandle), #{handler => Pid}. @@ -38,6 +42,7 @@ s1ap_proxy:shutdown(Pid), exometer:stop(), gtpu_kpi:shutdown(), + enb_registry:shutdown(), pfcp_mock:unmock_all(). -- To view, visit https://gerrit.osmocom.org/c/erlang/osmo-s1gw/+/41034?usp=email To unsubscribe, or for help writing mail filters, visit https://gerrit.osmocom.org/settings?usp=email Gerrit-MessageType: newchange Gerrit-Project: erlang/osmo-s1gw Gerrit-Branch: master Gerrit-Change-Id: I77a5a750ca6342da3a99926a44926b3973ab19c4 Gerrit-Change-Number: 41034 Gerrit-PatchSet: 1 Gerrit-Owner: fixeria <vyanits...@sysmocom.de>