fixeria has submitted this change. ( 
https://gerrit.osmocom.org/c/erlang/osmo-s1gw/+/38533?usp=email )

Change subject: pfcp_peer: implement Heartbeat Request procedure
......................................................................

pfcp_peer: implement Heartbeat Request procedure

Change-Id: Ie52d58ffcc169abe8e72ea106ab957c12af2e8b9
---
M src/pfcp_peer.erl
1 file changed, 139 insertions(+), 17 deletions(-)

Approvals:
  pespin: Looks good to me, but someone else must approve
  laforge: Looks good to me, but someone else must approve
  Jenkins Builder: Verified
  fixeria: Looks good to me, approved




diff --git a/src/pfcp_peer.erl b/src/pfcp_peer.erl
index cc98c49..9a8709c 100644
--- a/src/pfcp_peer.erl
+++ b/src/pfcp_peer.erl
@@ -43,6 +43,8 @@
          terminate/3]).
 -export([start_link/2,
          seid_alloc/0,
+         heartbeat_req/0,
+         heartbeat_req/1,
          session_establish_req/3,
          session_modify_req/3,
          session_delete_req/1,
@@ -54,8 +56,8 @@

 %% 3GPP TS 29.244, section 4.2 "UDP Header and Port Numbers"
 -define(PFCP_PORT, 8805).
-
 -define(PFCP_SEID_MAX, 16#ffffffffffffffff).
+-define(PFCP_HEARTBEAT_REQ_TIMEOUT, 2000).

 -type pfcp_session_rsp() :: ok | {error, term()}.

@@ -75,6 +77,11 @@
               pfcp_msg/0,
               pfcp_pdu/0]).

+-record(heartbeat_state, {from :: undefined | gen_statem:from(),
+                          seq_nr :: pfcp_seq_nr(),
+                          pid :: pid()
+                         }).
+
 -record(peer_state, {seid :: pfcp_seid(),
                      sock :: gen_udp:socket(),
                      loc_addr :: inet:ip_address(),
@@ -82,7 +89,8 @@
                      loc_rts :: pos_integer(),
                      rem_rts :: undefined | pos_integer(),
                      seq_nr :: pfcp_seq_nr(),
-                     registry :: dict:dict()
+                     registry :: dict:dict(),
+                     heartbeat :: undefined | #heartbeat_state{}
                     }).

 -type peer_state() :: #peer_state{}.
@@ -104,6 +112,18 @@
     gen_statem:call(?MODULE, ?FUNCTION_NAME).


+-spec heartbeat_req() -> ok | {error, term()}.
+heartbeat_req() ->
+    heartbeat_req(block).
+
+-spec heartbeat_req(block | noblock) -> ok | {error, term()}.
+heartbeat_req(block) ->
+    gen_statem:call(?MODULE, ?FUNCTION_NAME);
+
+heartbeat_req(noblock) ->
+    gen_statem:cast(?MODULE, ?FUNCTION_NAME).
+
+
 -spec session_establish_req(pfcp_seid(), list(), list()) -> pfcp_session_rsp().
 session_establish_req(SEID, PDRs, FARs) ->
     gen_statem:call(?MODULE, {?FUNCTION_NAME, SEID, PDRs, FARs}).
@@ -174,8 +194,8 @@
 
 %% Handle incoming PFCP PDU(s)
 connecting(info, {udp, Sock, FromIp, FromPort, Data},
-           #peer_state{sock = Sock} = S) ->
-    PDU = decode_pdu(Data, {FromIp, FromPort}, S),
+           #peer_state{sock = Sock} = S0) ->
+    PDU = decode_pdu(Data, {FromIp, FromPort}, S0),
     case PDU of
         #pfcp{type = association_setup_response,
               ie = #{pfcp_cause := 'Request accepted',
@@ -184,7 +204,7 @@
             s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_ASSOC_SETUP_RESP_RX),
             s1gw_metrics:ctr_inc(?S1GW_CTR_PFCP_ASSOC_SETUP_RESP_RX_ACK),
             {next_state, connected,
-             S#peer_state{rem_rts = RRTS}};
+             S0#peer_state{rem_rts = RRTS}};
         #pfcp{type = association_setup_response,
               ie = #{pfcp_cause := Cause}} ->
             ?LOG_ERROR("Rx Association Setup Response (~p)", [Cause]),
@@ -195,11 +215,14 @@
         %% A CP function or UP function shall be prepared to receive a 
Heartbeat Request
         %% at any time (even from unknown peers) and it shall reply with a 
Heartbeat Response.
         #pfcp{type = heartbeat_request} ->
-            recv_heartbeat_request(PDU, {FromIp, FromPort}, S),
-            {keep_state, S};
+            {_, S1} = recv_heartbeat_request(PDU, {FromIp, FromPort}, S0),
+            {keep_state, S1};
+        #pfcp{type = heartbeat_response} ->
+            {_, S1} = recv_heartbeat_response(PDU, {FromIp, FromPort}, S0),
+            {keep_state, S1};
         _ ->
             ?LOG_NOTICE("Rx unexpected PFCP PDU: ~p", [PDU]),
-            {keep_state, S}
+            {keep_state, S0}
     end;

 connecting(Event, EventData, S) ->
@@ -232,18 +255,22 @@

 %% Handle incoming PFCP PDU(s)
 connected(info, {udp, Sock, FromIp, FromPort, Data},
-          #peer_state{sock = Sock} = S) ->
-    PDU = decode_pdu(Data, {FromIp, FromPort}, S),
+          #peer_state{sock = Sock} = S0) ->
+    PDU = decode_pdu(Data, {FromIp, FromPort}, S0),
     case PDU of
-        %% 3GPP TS 29.244, 6.2.2.2 Heartbeat Request
         #pfcp{type = heartbeat_request} ->
-            recv_heartbeat_request(PDU, {FromIp, FromPort}, S);
+            {_, S1} = recv_heartbeat_request(PDU, {FromIp, FromPort}, S0),
+            {keep_state, S1};
+        #pfcp{type = heartbeat_response} ->
+            {_, S1} = recv_heartbeat_response(PDU, {FromIp, FromPort}, S0),
+            {keep_state, S1};
         #pfcp{seid = SEID} when is_integer(SEID) ->
-            route_pdu(PDU, S);
+            route_pdu(PDU, S0),
+            {keep_state, S0};
         _ ->
-            ?LOG_NOTICE("Rx unexpected PFCP PDU: ~p", [PDU])
-    end,
-    {keep_state, S};
+            ?LOG_NOTICE("Rx unexpected PFCP PDU: ~p", [PDU]),
+            {keep_state, S0}
+    end;

 %% Catch-all handler for this state
 connected(Event, EventData, S) ->
@@ -259,6 +286,45 @@
     {keep_state, S1,
      [{reply, From, {ok, SEID}}]};

+%% Heartbeat Req (non-blocking)
+handle_event(_State,
+             cast, heartbeat_req,
+             #peer_state{} = S0) ->
+    {_, S1} = send_heartbeat_request(undefined, S0),
+    {keep_state, S1};
+
+%% Heartbeat Req (blocking)
+handle_event(_State,
+             {call, From}, heartbeat_req,
+             #peer_state{} = S0) ->
+    case send_heartbeat_request(From, S0) of
+        {ok, S1} ->
+            %% postpone reply until we get the Resp
+            {keep_state, S1};
+        {Error, S1} ->
+            {keep_state, S1,
+             [{reply, From, Error}]}
+    end;
+
+%% Heartbeat Req (timeout)
+handle_event(_State,
+             cast, heartbeat_request_watchdog,
+             #peer_state{heartbeat = HB} = S) ->
+    case HB of
+        #heartbeat_state{from = From,
+                         seq_nr = SeqNr} ->
+            ?LOG_NOTICE("Heartbeat Request (SeqNr=~p) timeout", [SeqNr]),
+            if
+                From =/= undefined ->
+                    gen_statem:reply(From, {error, timeout});
+                true ->
+                    ok
+            end,
+            {keep_state, S#peer_state{heartbeat = undefined}};
+        undefined ->
+            {keep_state, S}
+    end;
+
 handle_event(_State,
              {call, From}, {session_establish_req, _SEID, _PDRs, _FARs},
              #peer_state{} = S) ->
@@ -444,6 +510,61 @@

 %% 6.2.2 Heartbeat Procedure
 %% 7.4.2 Heartbeat Messages
+send_heartbeat_request(_From, #peer_state{heartbeat = #heartbeat_state{seq_nr 
= SeqNr}} = S) ->
+    ?LOG_ERROR("Another Heartbeat Request is still pending (SeqNr=~p)", 
[SeqNr]),
+    {{error, heartbeat_req_pending}, S};
+
+send_heartbeat_request(From, #peer_state{heartbeat = undefined,
+                                         seq_nr = SeqNr,
+                                         loc_rts = LRTS} = S0) ->
+    ReqIEs = #{recovery_time_stamp => #recovery_time_stamp{time = LRTS}},
+    ?LOG_INFO("Tx Heartbeat Request (SeqNr=~p): ~p", [SeqNr, ReqIEs]),
+    case send_pdu({heartbeat_request, ReqIEs}, S0) of
+        {ok, S1} ->
+            Pid = spawn(fun heartbeat_request_watchdog/0),
+            HB = #heartbeat_state{from = From,
+                                  seq_nr = SeqNr,
+                                  pid = Pid},
+            {ok, S1#peer_state{heartbeat = HB}};
+        Result ->
+            Result
+    end.
+
+
+heartbeat_request_watchdog() ->
+    receive
+        heartbeat_response ->
+            ok
+    after
+        ?PFCP_HEARTBEAT_REQ_TIMEOUT ->
+            gen_statem:cast(?MODULE, ?FUNCTION_NAME)
+    end.
+
+
+recv_heartbeat_response(#pfcp{type = heartbeat_response,
+                              seq_no = SeqNr,
+                              ie = RspIEs},
+                        {_FromIp, _FromPort},
+                        #peer_state{heartbeat = HB} = S) ->
+    ?LOG_INFO("Rx Heartbeat Response (SeqNr=~p): ~p", [SeqNr, RspIEs]),
+    case HB of
+        #heartbeat_state{from = From,
+                         seq_nr = SeqNr,
+                         pid = Pid} ->
+            if
+                From =/= undefined ->
+                    gen_statem:reply(From, ok);
+                true ->
+                    ok
+            end,
+            Pid ! heartbeat_response,
+            {ok, S#peer_state{heartbeat = undefined}};
+        _ ->
+            ?LOG_NOTICE("Heartbeat Response (SeqNr=~p) was not expected", 
[SeqNr]),
+            {{error, unexpected}, S}
+    end.
+
+
 recv_heartbeat_request(#pfcp{type = heartbeat_request,
                              seq_no = SeqNr,
                              ie = ReqIEs},
@@ -455,6 +576,7 @@
     {Result, _} = send_pdu({heartbeat_response, RspIEs},
                            S#peer_state{seq_nr = SeqNr,
                                         rem_addr = FromIp}),
-    Result.
+    {Result, S}.
+

 %% vim:set ts=4 sw=4 et:

--
To view, visit https://gerrit.osmocom.org/c/erlang/osmo-s1gw/+/38533?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: erlang/osmo-s1gw
Gerrit-Branch: master
Gerrit-Change-Id: Ie52d58ffcc169abe8e72ea106ab957c12af2e8b9
Gerrit-Change-Number: 38533
Gerrit-PatchSet: 2
Gerrit-Owner: fixeria <[email protected]>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: fixeria <[email protected]>
Gerrit-Reviewer: laforge <[email protected]>
Gerrit-Reviewer: pespin <[email protected]>

Reply via email to