fixeria has uploaded this change for review. ( 
https://gerrit.osmocom.org/c/erlang/osmo-s1gw/+/41281?usp=email )


Change subject: mme_registry: the MME registry (pool) implementation
......................................................................

mme_registry: the MME registry (pool) implementation

Change-Id: Id5480222439bf93eca2e994b291c619dff823b01
Related: SYS#7052
---
A src/mme_registry.erl
M src/osmo_s1gw_sup.erl
A test/mme_registry_test.erl
3 files changed, 424 insertions(+), 1 deletion(-)



  git pull ssh://gerrit.osmocom.org:29418/erlang/osmo-s1gw 
refs/changes/81/41281/1

diff --git a/src/mme_registry.erl b/src/mme_registry.erl
new file mode 100644
index 0000000..7867bec
--- /dev/null
+++ b/src/mme_registry.erl
@@ -0,0 +1,280 @@
+%% Copyright (C) 2025 by sysmocom - s.f.m.c. GmbH <[email protected]>
+%% Author: Vadim Yanitskiy <[email protected]>
+%%
+%% All Rights Reserved
+%%
+%% SPDX-License-Identifier: AGPL-3.0-or-later
+%%
+%% This program is free software; you can redistribute it and/or modify
+%% it under the terms of the GNU Affero General Public License as
+%% published by the Free Software Foundation; either version 3 of the
+%% License, or (at your option) any later version.
+%%
+%% This program is distributed in the hope that it will be useful,
+%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+%% GNU General Public License for more details.
+%%
+%% You should have received a copy of the GNU Affero General Public License
+%% along with this program.  If not, see <https://www.gnu.org/licenses/>.
+%%
+%% Additional Permission under GNU AGPL version 3 section 7:
+%%
+%% If you modify this Program, or any covered work, by linking or
+%% combining it with runtime libraries of Erlang/OTP as released by
+%% Ericsson on https://www.erlang.org (or a modified version of these
+%% libraries), containing parts covered by the terms of the Erlang Public
+%% License (https://www.erlang.org/EPLICENSE), the licensors of this
+%% Program grant you additional permission to convey the resulting work
+%% without the need to license the runtime libraries of Erlang/OTP under
+%% the GNU Affero General Public License. Corresponding Source for a
+%% non-source form of such a combination shall include the source code
+%% for the parts of the runtime libraries of Erlang/OTP used as well as
+%% that of the covered work.
+
+-module(mme_registry).
+-behaviour(gen_server).
+
+-export([init/1,
+         handle_info/2,
+         handle_call/3,
+         handle_cast/2,
+         terminate/2]).
+-export([start_link/0,
+         mme_register/1,
+         mme_unregister/1,
+         mme_select/0,
+         mme_select/1,
+         fetch_mme_info/1,
+         fetch_mme_list/0,
+         shutdown/0]).
+
+-include_lib("kernel/include/logger.hrl").
+
+-include("s1gw_metrics.hrl").
+-include("s1ap.hrl").
+
+
+-type mme_name() :: string().
+
+-type mme_info() :: #{name := mme_name(), %% unique identifier of this MME
+                      laddr => string() | inet:ip_address(),
+                      raddr := string() | inet:ip_address(),
+                      rport => inet:port_number()
+                     }.
+
+-type mme_list() :: [mme_info()].
+
+-export_type([mme_name/0,
+              mme_info/0]).
+
+
+%% ------------------------------------------------------------------
+%% public API
+%% ------------------------------------------------------------------
+
+-spec start_link() -> {ok, pid()} | term().
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+-spec mme_register(mme_info()) -> ok | {error, term()}.
+mme_register(MmeInfo) ->
+    gen_server:call(?MODULE, {?FUNCTION_NAME, MmeInfo}).
+
+
+-spec mme_unregister(mme_name()) -> ok | {error, term()}.
+mme_unregister(MmeName) ->
+    gen_server:call(?MODULE, {?FUNCTION_NAME, MmeName}).
+
+
+-spec mme_select() -> {ok, mme_info()} | error.
+mme_select() ->
+    gen_server:call(?MODULE, {?FUNCTION_NAME, undefined}).
+
+
+-spec mme_select(mme_name()) -> {ok, mme_info()} | error.
+mme_select(OldMmeName) ->
+    gen_server:call(?MODULE, {?FUNCTION_NAME, OldMmeName}).
+
+
+-spec fetch_mme_info(mme_name()) -> {ok, mme_info()} | error.
+fetch_mme_info(MmeName) ->
+    gen_server:call(?MODULE, {?FUNCTION_NAME, MmeName}).
+
+
+-spec fetch_mme_list() -> mme_list().
+fetch_mme_list() ->
+    gen_server:call(?MODULE, ?FUNCTION_NAME).
+
+
+-spec shutdown() -> ok.
+shutdown() ->
+    gen_server:stop(?MODULE).
+
+
+%% ------------------------------------------------------------------
+%% gen_server API
+%% ------------------------------------------------------------------
+
+init([]) ->
+    %% parse MMEs from the environment
+    MMEs = mme_list_from_env(),
+    {ok, MMEs}.
+
+
+handle_call({mme_register, MmeInfo}, _From, MMEs0) ->
+    case mme_add(MmeInfo, MMEs0) of
+        {ok, MMEs1} ->
+            {reply, ok, MMEs1};
+        {error, Error} ->
+            {reply, {error, Error}, MMEs0}
+    end;
+
+handle_call({mme_unregister, MmeName}, _From, MMEs0) ->
+    case mme_del(MmeName, MMEs0) of
+        {ok, MMEs1} ->
+            %% TODO: kill all eNB connections?
+            {reply, ok, MMEs1};
+        {error, Error} ->
+            {reply, {error, Error}, MMEs0}
+    end;
+
+handle_call({mme_select, OldMmeName}, _From, MMEs) ->
+    Reply = mme_select(OldMmeName, MMEs),
+    {reply, Reply, MMEs};
+
+handle_call({fetch_mme_info, MmeName}, _From, MMEs) ->
+    Reply = mme_find(MmeName, MMEs),
+    {reply, Reply, MMEs};
+
+handle_call(fetch_mme_list, _From, MMEs) ->
+    {reply, MMEs, MMEs};
+
+handle_call(Info, From, MMEs) ->
+    ?LOG_ERROR("unknown ~p() from ~p: ~p", [?FUNCTION_NAME, From, Info]),
+    {reply, {error, not_implemented}, MMEs}.
+
+
+handle_cast(Info, MMEs) ->
+    ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]),
+    {noreply, MMEs}.
+
+
+handle_info(Info, MMEs) ->
+    ?LOG_ERROR("unknown ~p(): ~p", [?FUNCTION_NAME, Info]),
+    {noreply, MMEs}.
+
+
+terminate(Reason, _MMEs) ->
+    ?LOG_NOTICE("Terminating, reason ~p", [Reason]),
+    ok.
+
+
+%% ------------------------------------------------------------------
+%% private API
+%% ------------------------------------------------------------------
+
+-spec mme_match(mme_info(), map()) -> true | false.
+mme_match(#{name := Name},
+          #{name := Name}) -> true;
+
+mme_match(#{raddr := Addr, rport := Port},
+          #{raddr := Addr, rport := Port}) -> true;
+
+mme_match(_, _) -> false.
+
+
+%% Add a new MME if it does not already exist
+-spec mme_add(mme_info(), mme_list()) -> {ok, mme_list()} | {error, term()}.
+mme_add(MmeInfo0, MMEs) ->
+    %% assign defaults, parse local/remote addresses
+    MmeName = maps:get(name, MmeInfo0),
+    LAddr = sctp_common:parse_addr(maps:get(laddr, MmeInfo0, "::")),
+    RAddr = sctp_common:parse_addr(maps:get(raddr, MmeInfo0)),
+    RPort = maps:get(rport, MmeInfo0, ?S1AP_PORT),
+    MmeInfo1 = MmeInfo0#{laddr => LAddr,
+                         raddr => RAddr,
+                         rport => RPort},
+    %% check for duplicates
+    case lists:any(fun(E) -> mme_match(E, MmeInfo1) end, MMEs) of
+        true ->
+            ?LOG_ERROR("MME (name=~p / ~p:~p) is *already* registered",
+                       [MmeName, RAddr, RPort]),
+            {error, already_registered};
+        false ->
+            ?LOG_INFO("MME (name=~p, ~p:~p) registered",
+                      [MmeName, RAddr, RPort]),
+            {ok, MMEs ++ [MmeInfo1]}
+    end.
+
+
+%% Remove an MME by name
+-spec mme_del(mme_name(), mme_list()) -> {ok, mme_list()} | {error, term()}.
+mme_del(MmeName, MMEs0) ->
+    Fun = fun(E) -> not mme_match(E, #{name => MmeName}) end,
+    case lists:filter(Fun, MMEs0) of
+        MMEs0 -> %% unchanged list means nothing was filtered out
+            ?LOG_ERROR("MME (name=~p) is *not* registered", [MmeName]),
+            {error, not_registered};
+        MMEs1 ->
+            ?LOG_INFO("MME (name=~p) unregistered", [MmeName]),
+            {ok, MMEs1}
+    end.
+
+
+%% Select an MME from the pool
+-spec mme_select(OldMmeName, MMEs) -> Result
+    when OldMmeName :: undefined | mme_name(),
+         MMEs :: mme_list(),
+         Result :: {ok, mme_info()} | error.
+mme_select(_OldMmeName, []) ->
+    %% empty MME pool
+    error;
+
+mme_select(_OldMmeName, [MmeInfo]) ->
+    %% only one MME in the pool
+    {ok, MmeInfo};
+
+mme_select(undefined, MMEs) ->
+    %% old MME unknown => fall back to the first entry
+    {ok, hd(MMEs)};
+
+mme_select(OldMmeName, MMEs) ->
+    Fun = fun(E) -> maps:get(name, E) =/= OldMmeName end,
+    case lists:dropwhile(Fun, MMEs) of
+        [] ->
+            %% old MME is not registered, fall back to the first entry
+            {ok, hd(MMEs)};
+        [_ | []] ->
+            %% old MME was the last entry, wrap around
+            {ok, hd(MMEs)};
+        [_ | [MmeInfo | _]] ->
+            %% return MME following the old one
+            {ok, MmeInfo}
+    end.
+
+
+%% Find an MME by name
+-spec mme_find(mme_name(), mme_list()) -> {ok, mme_info()} | error.
+mme_find(MmeName, MMEs) ->
+    Fun = fun(E) -> mme_match(E, #{name => MmeName}) end,
+    case lists:filter(Fun, MMEs) of
+        [MmeInfo] -> {ok, MmeInfo};
+        [] -> error
+    end.
+
+
+-spec mme_list_from_env() -> mme_list().
+mme_list_from_env() ->
+    MMEs = osmo_s1gw:get_env(mme_pool, []),
+    lists:foldl(fun mme_add_from_env/2, [], MMEs).
+
+
+-spec mme_add_from_env(mme_info(), mme_list()) -> mme_list().
+mme_add_from_env(MmeInfo, MMEs0) ->
+    {ok, MMEs1} = mme_add(MmeInfo, MMEs0),
+    MMEs1.
+
+
+%% vim:set ts=4 sw=4 et:
diff --git a/src/osmo_s1gw_sup.erl b/src/osmo_s1gw_sup.erl
index 1616fe5..86ecbda 100644
--- a/src/osmo_s1gw_sup.erl
+++ b/src/osmo_s1gw_sup.erl
@@ -73,6 +73,11 @@
                    5000,
                    worker,
                    [enb_registry]},
+    MmeRegistry = {mme_registry, {mme_registry, start_link, []},
+                   permanent,
+                   5000,
+                   worker,
+                   [mme_registry]},
     SctpServer = {sctp_server, {sctp_server, start_link, [server_cfg()]},
                   permanent,
                   5000,
@@ -96,7 +101,13 @@
                   [erf]},

     s1gw_metrics:init(),
-    {ok, {{one_for_one, 5, 10}, [EnbRegistry, SctpServer, PfcpPeer, GtpuKpi, 
RestServer]}}.
+    {ok, {{one_for_one, 5, 10},
+          [EnbRegistry,
+           MmeRegistry,
+           SctpServer,
+           PfcpPeer,
+           GtpuKpi,
+           RestServer]}}.


 %% ------------------------------------------------------------------
diff --git a/test/mme_registry_test.erl b/test/mme_registry_test.erl
new file mode 100644
index 0000000..a327233
--- /dev/null
+++ b/test/mme_registry_test.erl
@@ -0,0 +1,132 @@
+-module(mme_registry_test).
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+%% ------------------------------------------------------------------
+%% setup functions
+%% ------------------------------------------------------------------
+
+-define(TC(Fun), {setup,
+                  fun start/0,
+                  fun stop/1,
+                  Fun}).
+
+
+start() ->
+    MMEs = [#{name => "mme0", raddr => "127.0.0.10"},
+            #{name => "mme1", raddr => "127.0.0.11"}],
+    osmo_s1gw:set_env(mme_pool, MMEs),
+    {ok, _} = mme_registry:start_link().
+
+
+stop(_) ->
+    mme_registry:shutdown().
+
+
+%% ------------------------------------------------------------------
+%% testcase descriptions
+%% ------------------------------------------------------------------
+
+mme_registry_test_() ->
+    [{"Test adding and deleting MMEs",
+      ?TC(fun test_add_del/1)},
+     {"Test fetching MME info",
+      ?TC(fun test_fetch/1)},
+     {"Test MME selection",
+      ?TC(fun test_select/1)},
+     {"Test MME selection (empty pool)",
+      ?TC(fun test_select_empty/1)}].
+
+
+%% ------------------------------------------------------------------
+%% actual testcases
+%% ------------------------------------------------------------------
+
+test_add_del(_) ->
+    [%% "mme0" and "mme1" come pre-registered, register "mme2"
+     ?_assertEqual(ok, mme_registry:mme_register(#{name => "mme2",
+                                                   raddr => "127.0.0.12"})),
+     %% duplicate name
+     ?_assertEqual({error, already_registered},
+                   mme_registry:mme_register(#{name => "mme2",
+                                               raddr => "127.0.0.13"})),
+     %% duplicate raddr:rport
+     ?_assertEqual({error, already_registered},
+                   mme_registry:mme_register(#{name => "mme3",
+                                               raddr => "127.0.0.12"})),
+     %% deleting previously registered MMEs
+     ?_assertEqual(ok, mme_registry:mme_unregister("mme0")),
+     ?_assertEqual(ok, mme_registry:mme_unregister("mme1")),
+     ?_assertEqual(ok, mme_registry:mme_unregister("mme2")),
+     %% deleting non-existent MMEs
+     ?_assertEqual({error, not_registered},
+                   mme_registry:mme_unregister("mme0")),
+     ?_assertEqual({error, not_registered},
+                   mme_registry:mme_unregister("mme1")),
+     ?_assertEqual({error, not_registered},
+                   mme_registry:mme_unregister("mme42"))].
+
+
+test_fetch(_) ->
+    [%% "mme0" and "mme1" come pre-registered, register "mme2"
+     ?_assertEqual(ok, mme_registry:mme_register(#{name => "mme2",
+                                                   laddr => "192.168.1.100",
+                                                   raddr => "192.168.1.101",
+                                                   rport => 1337})),
+     %% test fetching a list of MMEs
+     ?_assertMatch([#{name := "mme0", raddr := {127,0,0,10}},
+                    #{name := "mme1", raddr := {127,0,0,11}},
+                    #{name := "mme2", raddr := {192,168,1,101}}],
+                   mme_registry:fetch_mme_list()),
+     %% test fetching MMEs by name
+     ?_assertMatch({ok, #{name := "mme0",
+                          laddr := {0,0,0,0,0,0,0,0}, %% "::"
+                          raddr := {127,0,0,10},
+                          rport := 36412}},
+                   mme_registry:fetch_mme_info("mme0")),
+     ?_assertMatch({ok, #{name := "mme1",
+                          laddr := {0,0,0,0,0,0,0,0}, %% "::"
+                          raddr := {127,0,0,11},
+                          rport := 36412}},
+                   mme_registry:fetch_mme_info("mme1")),
+     ?_assertMatch({ok, #{name := "mme2",
+                          laddr := {192,168,1,100},
+                          raddr := {192,168,1,101},
+                          rport := 1337}},
+                   mme_registry:fetch_mme_info("mme2")),
+     %% "mme3" is not registered
+     ?_assertEqual(error,
+                   mme_registry:fetch_mme_info("mme3"))].
+
+
+test_select(_) ->
+    [%% "mme0" and "mme1" come pre-registered, register "mme2"
+     ?_assertEqual(ok, mme_registry:mme_register(#{name => "mme2",
+                                                   raddr => "127.0.0.12"})),
+     %% old MME unknown, fall back to the first entry ("mme0")
+     ?_assertMatch({ok, #{name := "mme0"}},
+                   mme_registry:mme_select()),
+     ?_assertMatch({ok, #{name := "mme0"}},
+                   mme_registry:mme_select("mme3")),
+     %% old MME known, expect the next MME to be selected
+     ?_assertMatch({ok, #{name := "mme1"}},
+                   mme_registry:mme_select("mme0")),
+     ?_assertMatch({ok, #{name := "mme2"}},
+                   mme_registry:mme_select("mme1")),
+     ?_assertMatch({ok, #{name := "mme0"}},
+                   mme_registry:mme_select("mme2"))].
+
+
+test_select_empty(_) ->
+    [%% "mme0" and "mme1" come pre-registered, unregister them
+     ?_assertEqual(ok, mme_registry:mme_unregister("mme0")),
+     ?_assertEqual(ok, mme_registry:mme_unregister("mme1")),
+     %% the MME pool is now empty, expect an error
+     ?_assertEqual(error, mme_registry:mme_select()),
+     ?_assertEqual(error, mme_registry:mme_select("mme0")),
+     ?_assertEqual(error, mme_registry:mme_select("mme1")),
+     ?_assertEqual(error, mme_registry:mme_select("mme2"))].
+
+
+%% vim:set ts=4 sw=4 et:

--
To view, visit https://gerrit.osmocom.org/c/erlang/osmo-s1gw/+/41281?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: erlang/osmo-s1gw
Gerrit-Branch: master
Gerrit-Change-Id: Id5480222439bf93eca2e994b291c619dff823b01
Gerrit-Change-Number: 41281
Gerrit-PatchSet: 1
Gerrit-Owner: fixeria <[email protected]>

Reply via email to