neels has uploaded this change for review. ( 
https://gerrit.osmocom.org/c/osmo-upf/+/28244 )


Change subject: add pfcp_endpoint
......................................................................

add pfcp_endpoint

Related: SYS#5599
Change-Id: Ic8d42e201b63064a71b40ca45a5a40e29941e8ac
---
M include/osmocom/pfcp/Makefile.am
A include/osmocom/pfcp/pfcp_endpoint.h
M include/osmocom/pfcp/pfcp_msg.h
M src/libosmo-pfcp/Makefile.am
A src/libosmo-pfcp/pfcp_endpoint.c
5 files changed, 580 insertions(+), 0 deletions(-)



  git pull ssh://gerrit.osmocom.org:29418/osmo-upf refs/changes/44/28244/1

diff --git a/include/osmocom/pfcp/Makefile.am b/include/osmocom/pfcp/Makefile.am
index 7bd5ea3..5d2db19 100644
--- a/include/osmocom/pfcp/Makefile.am
+++ b/include/osmocom/pfcp/Makefile.am
@@ -1,4 +1,5 @@
 pfcp_HEADERS = \
+       pfcp_endpoint.h \
        pfcp_ies_custom.h \
        pfcp_ies_auto.h \
        pfcp_msg.h \
diff --git a/include/osmocom/pfcp/pfcp_endpoint.h 
b/include/osmocom/pfcp/pfcp_endpoint.h
new file mode 100644
index 0000000..fa496dd
--- /dev/null
+++ b/include/osmocom/pfcp/pfcp_endpoint.h
@@ -0,0 +1,103 @@
+/*
+ * (C) 2021-2022 by sysmocom - s.f.m.c. GmbH <[email protected]>
+ * All Rights Reserved.
+ *
+ * Author: Neels Janosch Hofmeyr <[email protected]>
+ *
+ * SPDX-License-Identifier: GPL-2.0+
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#pragma once
+
+#include <osmocom/core/socket.h>
+#include <osmocom/core/select.h>
+#include <osmocom/core/tdef.h>
+
+#include <osmocom/pfcp/pfcp_msg.h>
+
+struct osmo_pfcp_endpoint;
+struct osmo_fsm_inst;
+
+#define OSMO_PFCP_TIMER_T1 -22
+#define OSMO_PFCP_TIMER_N1 -23
+#define OSMO_PFCP_TIMER_KEEP_RESP -24
+#define OSMO_PFCP_TIMER_ASSOC_RETRY -26
+#define OSMO_PFCP_TIMER_GRACEFUL_REL -27
+
+extern struct osmo_tdef osmo_pfcp_tdefs[];
+
+/* Ownership of m remains with the caller / m will be deallocated by the 
caller.
+ * \param ep  The PFCP endpoint that received and decoded the message.
+ * \param m  The message that was received.
+ * \param req  If m is a PFCP Response to an earlier Request, req is that 
request message. */
+typedef void (*osmo_pfcp_endpoint_cb)(struct osmo_pfcp_endpoint *ep, struct 
osmo_pfcp_msg *m,
+                                     struct osmo_pfcp_msg *req);
+
+/* Send/receive PFCP messages to/from remote PFCP endpoints. */
+struct osmo_pfcp_endpoint {
+       struct {
+               /* Local address */
+               struct osmo_sockaddr local_addr;
+               /* Local PFCP Node ID, as sent in outgoing messages' Node ID IE 
*/
+               struct osmo_pfcp_ie_node_id local_node_id;
+
+               /* Timer definitions to use, if any. See t1_ms, keep_resp_ms. 
Use osmo_pfcp_tdefs by default. It is
+                * convenient to add osmo_pfcp_tdefs as one of your program's 
osmo_tdef_group entries and call
+                * osmo_tdef_vty_init() to expose PFCP timers on the VTY. */
+               const struct osmo_tdef *tdefs;
+       } cfg;
+
+       /* PFCP socket */
+       struct osmo_fd pfcp_fd;
+
+       /* The time at which this endpoint last restarted, as seconds since 
unix epoch. */
+       uint32_t recovery_time_stamp;
+
+       /* State for determining the next sequence number for transmitting a 
request message */
+       uint32_t seq_nr_state;
+
+       /* This function is called just after decoding and before handling the 
message.
+        * This function may set ctx.peer_fi and ctx.session_fi, used for 
logging context during message decoding.
+        * The caller may also use these fi pointers to reduce lookup 
iterations in rx_msg().
+        */
+       osmo_pfcp_endpoint_cb set_msg_ctx;
+
+       /* Callback to receive single incoming PFCP messages from a remote 
peer, already decoded. */
+       osmo_pfcp_endpoint_cb rx_msg;
+
+       /* application-private data */
+       void *priv;
+
+       /* All transmitted messages that are still pending, list of 
osmo_pfcp_queue_entry.
+        * For a transmitted Request message, wait for a matching Response from 
a remote peer; if none arrives,
+        * retransmit (see n1 and t1_ms).
+        * For a transmitted Response message, keep it in the queue for a fixed 
amount of time. If the peer retransmits
+        * the original Request, do not dispatch the Request, but respond with 
the queued message directly.
+        */
+       struct llist_head retrans_queue;
+};
+
+struct osmo_pfcp_endpoint *osmo_pfcp_endpoint_create(void *ctx, void *priv);
+int osmo_pfcp_endpoint_bind(struct osmo_pfcp_endpoint *ep);
+void osmo_pfcp_endpoint_close(struct osmo_pfcp_endpoint *ep);
+void osmo_pfcp_endpoint_free(struct osmo_pfcp_endpoint **ep);
+
+int osmo_pfcp_endpoint_tx(struct osmo_pfcp_endpoint *ep, struct osmo_pfcp_msg 
*m);
+int osmo_pfcp_endpoint_tx_data(struct osmo_pfcp_endpoint *ep, struct 
osmo_pfcp_msg *m);
+int osmo_pfcp_endpoint_tx_heartbeat_req(struct osmo_pfcp_endpoint *ep, const 
struct osmo_sockaddr *remote_addr);
+
+void osmo_pfcp_endpoint_invalidate_ctx(struct osmo_pfcp_endpoint *ep, struct 
osmo_fsm_inst *deleted_fi);
diff --git a/include/osmocom/pfcp/pfcp_msg.h b/include/osmocom/pfcp/pfcp_msg.h
index 949800e..2833408 100644
--- a/include/osmocom/pfcp/pfcp_msg.h
+++ b/include/osmocom/pfcp/pfcp_msg.h
@@ -147,6 +147,9 @@
                osmo_pfcp_resp_cb resp_cb;
                void *priv;
        } ctx;
+
+       /* When a message gets encoded, the encoded packet is cached here for 
possible retransmissions. */
+       struct msgb *encoded;
 };

 /* Given a &osmo_pfcp_msg->ies pointer, return the &osmo_pfcp_msg.
diff --git a/src/libosmo-pfcp/Makefile.am b/src/libosmo-pfcp/Makefile.am
index c8accb9..db81648 100644
--- a/src/libosmo-pfcp/Makefile.am
+++ b/src/libosmo-pfcp/Makefile.am
@@ -24,6 +24,7 @@
        $(NULL)

 libosmo_pfcp_a_SOURCES = \
+       pfcp_endpoint.c \
        pfcp_ies_custom.c \
        pfcp_msg.c \
        pfcp_strs.c \
diff --git a/src/libosmo-pfcp/pfcp_endpoint.c b/src/libosmo-pfcp/pfcp_endpoint.c
new file mode 100644
index 0000000..3be2aab
--- /dev/null
+++ b/src/libosmo-pfcp/pfcp_endpoint.c
@@ -0,0 +1,472 @@
+/*
+ * (C) 2021-2022 by sysmocom - s.f.m.c. GmbH <[email protected]>
+ * All Rights Reserved.
+ *
+ * Author: Neels Janosch Hofmeyr <[email protected]>
+ *
+ * SPDX-License-Identifier: GPL-2.0+
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include <errno.h>
+#include <unistd.h>
+#include <time.h>
+
+#include <osmocom/core/talloc.h>
+#include <osmocom/core/timer.h>
+#include <osmocom/core/tdef.h>
+
+#include <osmocom/pfcp/pfcp_endpoint.h>
+#include <osmocom/pfcp/pfcp_msg.h>
+
+/*! Entry of pfcp_endpoint message queue of PFCP messages, for re-transsions. 
*/
+struct osmo_pfcp_queue_entry {
+       /* entry in per-peer list of messages waiting for a response */
+       struct llist_head entry;
+       /* back-pointer */
+       struct osmo_pfcp_endpoint *ep;
+       /* message we have transmitted */
+       struct osmo_pfcp_msg *m;
+       /* T1 timer: How long to wait for response before retransmitting */
+       struct osmo_timer_list t1;
+       /* N1: number of pending re-transmissions */
+       unsigned int n1_remaining;
+};
+
+/* find a matching osmo_pfcp_queue_entry for given rx_hdr */
+static struct osmo_pfcp_queue_entry *
+osmo_pfcp_queue_find(struct llist_head *queue, const struct osmo_pfcp_msg *rx)
+{
+       struct osmo_pfcp_queue_entry *qe;
+       /* It's important to match only a Request to a Response and vice versa, 
because the remote peer makes its own
+        * sequence_nr. There could be a collision of sequence_nr. But as long 
as all Requests look for a Response and
+        * vice versa, the sequence_nr scopes don't overlap. */
+       llist_for_each_entry(qe, queue, entry) {
+               if (qe->m->is_response != rx->is_response
+                   && qe->m->h.sequence_nr == rx->h.sequence_nr)
+                       return qe;
+       }
+       return NULL;
+}
+
+/* clean up and deallocate the given osmo_pfcp_queue_entry */
+static void osmo_pfcp_queue_del(struct osmo_pfcp_queue_entry *qe)
+{
+       /* see also the talloc destructor */
+       talloc_free(qe);
+}
+
+int osmo_pfcp_queue_destructor(struct osmo_pfcp_queue_entry *qe)
+{
+       osmo_timer_del(&qe->t1);
+       llist_del(&qe->entry);
+       return 0;
+}
+
+struct osmo_tdef osmo_pfcp_tdefs[] = {
+       { .T = -19, .default_val = 15, .unit = OSMO_TDEF_S,
+         .desc = "PFCP Heartbeat Request period, how long to wait between 
issuing requests"
+       },
+       { .T = -20, .default_val = 15, .unit = OSMO_TDEF_S,
+         .desc = "PFCP Heartbeat Response timeout, the time after which to 
regard a non-responding peer as disconnected"
+       },
+       { .T = -21, .default_val = 15, .unit = OSMO_TDEF_S,
+         .desc = "PFCP peer graceful shutdown timeout, how long to keep the 
peer's state after a peer requested"
+                 " graceful shutdown"
+       },
+       { .T = OSMO_PFCP_TIMER_T1, .default_val = 3000, .unit = OSMO_TDEF_MS,
+         .desc = "PFCP request timeout, how long after a missing response to 
retransmit a PFCP request"
+       },
+       { .T = OSMO_PFCP_TIMER_N1, .default_val = 3, .unit = OSMO_TDEF_CUSTOM,
+         .desc = "Number of PFCP request retransmission attempts"
+       },
+       { .T = OSMO_PFCP_TIMER_KEEP_RESP, .default_val = 10000, .unit = 
OSMO_TDEF_MS,
+         .desc = "PFCP response timeout, how long to keep a response, in case 
its same request is retransmitted by the peer"
+       },
+       { .T = OSMO_PFCP_TIMER_ASSOC_RETRY, .default_val = 15, .unit = 
OSMO_TDEF_S,
+         .desc = "Idle time between attempts of PFCP Association Setup (CPF)"
+       },
+       { .T = OSMO_PFCP_TIMER_GRACEFUL_REL, .default_val = 15, .unit = 
OSMO_TDEF_S,
+         .desc = "PFCP graceful release timeout"
+       },
+       {}
+};
+
+struct osmo_pfcp_endpoint *osmo_pfcp_endpoint_create(void *ctx, void *priv)
+{
+       struct osmo_pfcp_endpoint *ep = talloc_zero(ctx, struct 
osmo_pfcp_endpoint);
+       uint32_t unix_time;
+       if (!ep)
+               return NULL;
+
+       INIT_LLIST_HEAD(&ep->retrans_queue);
+
+       ep->cfg.tdefs = osmo_pfcp_tdefs;
+       ep->priv = priv;
+       ep->pfcp_fd.fd = -1;
+
+       /* time() returns seconds since 1970 (UNIX epoch), but the 
recovery_time_stamp is coded in the NTP format, which is
+        * seconds since 1900, the NTP era 0. 2208988800L is the offset between 
UNIX epoch and NTP era 0.
+        * TODO: what happens when we enter NTP era 1? Is it sufficient to 
integer-wrap? */
+       unix_time = time(NULL);
+       ep->recovery_time_stamp = unix_time + 2208988800L;
+       LOGP(DLPFCP, LOGL_NOTICE, "PFCP endpoint: recovery timestamp = 0x%08x 
(%u seconds since UNIX epoch,"
+            " which is %u seconds since NTP era 0; IETF RFC 5905)\n",
+            ep->recovery_time_stamp, unix_time, ep->recovery_time_stamp);
+
+       return ep;
+}
+
+static unsigned int ep_n1(struct osmo_pfcp_endpoint *ep)
+{
+       return osmo_tdef_get(ep->cfg.tdefs, OSMO_PFCP_TIMER_N1, 
OSMO_TDEF_CUSTOM, -1);
+}
+
+unsigned int ep_t1(struct osmo_pfcp_endpoint *ep)
+{
+       return osmo_tdef_get(ep->cfg.tdefs, OSMO_PFCP_TIMER_T1, OSMO_TDEF_MS, 
-1);
+}
+
+unsigned int ep_keep_resp(struct osmo_pfcp_endpoint *ep)
+{
+       return osmo_tdef_get(ep->cfg.tdefs, OSMO_PFCP_TIMER_KEEP_RESP, 
OSMO_TDEF_MS, -1);
+}
+
+static int osmo_pfcp_endpoint_tx_data_no_logging(struct osmo_pfcp_endpoint 
*ep, struct osmo_pfcp_msg *m);
+
+/* Return true to keep the message in the queue, false for dropping from the 
queue. */
+static bool pfcp_queue_retrans(struct osmo_pfcp_queue_entry *qe)
+{
+       struct osmo_pfcp_endpoint *endpoint = qe->ep;
+       unsigned int t1_ms = ep_t1(endpoint);
+       struct osmo_pfcp_msg *m = qe->m;
+       int rc;
+
+       /* re-transmit */
+       if (qe->n1_remaining)
+               qe->n1_remaining--;
+       OSMO_LOG_PFCP_MSG(m, LOGL_INFO, "re-sending (%u attempts remaining)\n", 
qe->n1_remaining);
+
+       rc = osmo_pfcp_endpoint_tx_data_no_logging(endpoint, m);
+       /* If encoding failed, it cannot ever succeed. Drop the queue entry. */
+       if (rc)
+               return false;
+       /* if no more attempts remaining, drop from queue */
+       if (!qe->n1_remaining)
+               return false;
+       /* re-schedule timer, keep in queue */
+       osmo_timer_schedule(&qe->t1, t1_ms/1000, t1_ms%1000);
+       return true;
+}
+
+/* T1 for a given queue entry has expired */
+static void pfcp_queue_timer_cb(void *data)
+{
+       struct osmo_pfcp_queue_entry *qe = data;
+       bool keep;
+
+       if (qe->m->is_response) {
+               /* The response has waited in the queue for any retransmissions 
of its initiating request. Now that time
+                * has passed and the response can be dropped from the queue. */
+               keep = false;
+       } else {
+               /* The request is still here, which means it has not received a 
response from the remote side.
+                * Retransmit the request. */
+               keep = pfcp_queue_retrans(qe);
+       }
+
+       if (keep)
+               return;
+       /* Drop the queue entry. No more retransmissions. */
+       if (!qe->m->is_response && qe->m->ctx.resp_cb)
+               qe->m->ctx.resp_cb(qe->m, NULL, "PFCP retransmissions elapsed, 
no response received");
+       osmo_pfcp_queue_del(qe);
+}
+
+/* Directly encode and transmit the message, without storing in the 
retrans_queue. */
+static int osmo_pfcp_endpoint_tx_data_no_logging(struct osmo_pfcp_endpoint 
*ep, struct osmo_pfcp_msg *m)
+{
+       int rc;
+
+       if (!m->encoded) {
+               /* Allocate msgb as child of the message m, so that when m gets 
deallocated at the end of
+                * retransmission queueing, the msgb gets deallocated with it. 
*/
+               m->encoded = msgb_alloc_c(m, OSMO_PFCP_MSGB_ALLOC_SIZE, 
"PFCP-tx");
+               OSMO_ASSERT(m->encoded);
+               rc = osmo_pfcp_msg_encode(m->encoded, m);
+               if (rc) {
+                       msgb_free(m->encoded);
+                       m->encoded = NULL;
+                       return rc;
+               }
+       }
+
+       rc = sendto(ep->pfcp_fd.fd, msgb_data(m->encoded), 
msgb_length(m->encoded), 0,
+                   (struct sockaddr*)&m->remote_addr, sizeof(m->remote_addr));
+       if (rc != msgb_length(m->encoded)) {
+               OSMO_LOG_PFCP_MSG(m, LOGL_ERROR, "sendto() failed: rc = %d != 
length %u\n",
+                                 rc, msgb_length(m->encoded));
+               return -EIO;
+       }
+       return 0;
+}
+
+int osmo_pfcp_endpoint_tx_data(struct osmo_pfcp_endpoint *ep, struct 
osmo_pfcp_msg *m)
+{
+       OSMO_LOG_PFCP_MSG(m, LOGL_INFO, "sending\n");
+       return osmo_pfcp_endpoint_tx_data_no_logging(ep, m);
+}
+
+int osmo_pfcp_endpoint_tx_heartbeat_req(struct osmo_pfcp_endpoint *ep, const 
struct osmo_sockaddr *remote_addr)
+{
+       struct osmo_pfcp_msg *tx = osmo_pfcp_msg_alloc_tx(OTC_SELECT, 
remote_addr, NULL, NULL,
+                                                         
OSMO_PFCP_MSGT_HEARTBEAT_REQ);
+       tx->ies.heartbeat_req.recovery_time_stamp = ep->recovery_time_stamp;
+       tx->h.sequence_nr = osmo_pfcp_next_seq_nr(&ep->seq_nr_state);
+       return osmo_pfcp_endpoint_tx_data(ep, tx);
+}
+
+/* add a given msgb to the queue of per-peer messages waiting for a response */
+static int osmo_pfcp_endpoint_retrans_queue_add(struct osmo_pfcp_endpoint 
*endpoint, struct osmo_pfcp_msg *m)
+{
+       struct osmo_pfcp_queue_entry *qe;
+       unsigned int n1 = ep_n1(endpoint);
+       unsigned int t1_ms = ep_t1(endpoint);
+       unsigned int keep_resp_ms = ep_keep_resp(endpoint);
+       unsigned int timeout = m->is_response ? keep_resp_ms : t1_ms;
+
+       LOGP(DLPFCP, LOGL_DEBUG, "retransmit unanswered Requests %u x %ums; 
keep sent Responses for %ums\n",
+            n1, t1_ms, keep_resp_ms);
+       /* If there are no retransmissions or no timeout, it makes no sense to 
add to the queue. */
+       if (!n1 || !t1_ms) {
+               if (!m->is_response && m->ctx.resp_cb)
+                       m->ctx.resp_cb(m, NULL, "PFCP timeout is zero, cannot 
wait for a response");
+               return 0;
+       }
+
+       qe = talloc(endpoint, struct osmo_pfcp_queue_entry);
+       OSMO_ASSERT(qe);
+       *qe = (struct osmo_pfcp_queue_entry){
+               .ep = endpoint,
+               .m = m,
+               .n1_remaining = m->is_response ? 0 : n1,
+       };
+       talloc_steal(qe, m);
+       llist_add_tail(&qe->entry, &endpoint->retrans_queue);
+       talloc_set_destructor(qe, osmo_pfcp_queue_destructor);
+
+       osmo_timer_setup(&qe->t1, pfcp_queue_timer_cb, qe);
+       osmo_timer_schedule(&qe->t1, timeout/1000, timeout%1000);
+       return 0;
+}
+
+/* Transmit a PFCP message.
+ * Store the message in the local message queue for possible retransmissions.
+ * On success, return zero, and pass ownership of m to ep. ep deallocates m 
when all retransmissions are done / a reply
+ * has been received.
+ * On error, return nonzero, and immediately deallocate m. */
+int osmo_pfcp_endpoint_tx(struct osmo_pfcp_endpoint *ep, struct osmo_pfcp_msg 
*m)
+{
+       struct osmo_pfcp_ie_node_id *node_id;
+       int rc;
+       if (!m->is_response)
+               m->h.sequence_nr = osmo_pfcp_next_seq_nr(&ep->seq_nr_state);
+       node_id = osmo_pfcp_msg_node_id(m);
+       if (node_id)
+               *node_id = ep->cfg.local_node_id;
+
+       rc = osmo_pfcp_endpoint_tx_data(ep, m);
+       if (rc) {
+               if (!m->is_response && m->ctx.resp_cb)
+                       m->ctx.resp_cb(m, NULL, "Failed to transmit request");
+               osmo_pfcp_msg_free(m);
+               return rc;
+       }
+       osmo_pfcp_endpoint_retrans_queue_add(ep, m);
+       return 0;
+}
+
+static void osmo_pfcp_endpoint_handle_rx(struct osmo_pfcp_endpoint *ep, struct 
osmo_pfcp_msg *m)
+{
+       bool dispatch_rx = true;
+       struct osmo_pfcp_queue_entry *prev_msg;
+       struct osmo_pfcp_msg *req;
+
+       if (m->h.message_type == OSMO_PFCP_MSGT_HEARTBEAT_REQ) {
+               /* Directly answer with a Heartbeat Response. */
+               struct osmo_pfcp_msg *resp = osmo_pfcp_msg_alloc_tx(OTC_SELECT, 
NULL, NULL, m, OSMO_PFCP_MSGT_HEARTBEAT_RESP);
+               resp->ies.heartbeat_resp.recovery_time_stamp = 
ep->recovery_time_stamp;
+               osmo_pfcp_endpoint_tx_data(ep, resp);
+               /* Still also dispatch the Rx event to the peer. */
+       }
+
+       /* If this is receiving a response, search for matching request that is 
now completed.
+        * If this is receiving a request, search for a matching response that 
can be retransmitted.
+        * Either way see if a matching sequence_nr is already in the queue. */
+       prev_msg = osmo_pfcp_queue_find(&ep->retrans_queue, m);
+       req = NULL;
+       if (prev_msg) {
+               if (m->is_response && !prev_msg->m->is_response) {
+                       /* Got a response, the original request is now ACKed 
and can be dropped from the retransmission
+                        * queue: see if (req) below. */
+                       req = prev_msg->m;
+
+                       /* Populate message context to point at peer and 
session, if applicable */
+                       if (ep->set_msg_ctx)
+                               ep->set_msg_ctx(ep, m, req);
+
+                       OSMO_LOG_PFCP_MSG(m, LOGL_INFO, "received response\n");
+
+                       if (req->ctx.resp_cb) {
+                               int rc = req->ctx.resp_cb(req, m, NULL);
+                               /* Only dispatch the response to rx_msg() when 
resp_cb() asks for it with rc == 1
+                                * (or when there is no resp_cb()). */
+                               if (rc != 1) {
+                                       dispatch_rx = false;
+                                       OSMO_LOG_PFCP_MSG(m, LOGL_DEBUG, 
"response handled by m->resp_cb(),"
+                                                         " not dispatching to 
rx_msg()\n");
+                               }
+                       }
+               } else if (!m->is_response && prev_msg->m->is_response) {
+                       /* Got a request, but we have already sent a response 
to this same request earlier. Retransmit
+                        * the same response, and don't dispatch the msg rx. 
Keep our response queued in case the
+                        * request is retransmitted yet another time. */
+
+                       /* Populate message context to point at peer and 
session, if applicable */
+                       if (ep->set_msg_ctx)
+                               ep->set_msg_ctx(ep, m, NULL);
+                       OSMO_LOG_PFCP_MSG(m, LOGL_INFO, "received 
retransmission of earlier request\n");
+
+                       OSMO_LOG_PFCP_MSG(prev_msg->m, LOGL_INFO, "re-sending 
cached response\n");
+                       osmo_pfcp_endpoint_tx_data_no_logging(ep, prev_msg->m);
+                       return;
+               } else {
+                       /* Can't tell how prev_msg relates to m, shouldn't 
happen in a sane PFCP conversation. Handle as
+                        * new, unrelated message. */
+                       prev_msg = NULL;
+               }
+       }
+
+       if (!prev_msg) {
+               if (ep->set_msg_ctx)
+                       ep->set_msg_ctx(ep, m, NULL);
+               OSMO_LOG_PFCP_MSG(m, LOGL_INFO, "received\n");
+       }
+
+       if (dispatch_rx)
+               ep->rx_msg(ep, m, req);
+       if (req)
+               osmo_pfcp_queue_del(prev_msg);
+}
+
+/* call-back for PFCP socket file descriptor */
+static int osmo_pfcp_fd_cb(struct osmo_fd *ofd, unsigned int what)
+{
+       int rc;
+       struct osmo_pfcp_endpoint *ep = ofd->data;
+
+       if (what & OSMO_FD_READ) {
+               struct osmo_sockaddr remote;
+               socklen_t remote_len = sizeof(remote);
+               struct msgb *msg = msgb_alloc_c(OTC_SELECT, 
OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-rx");
+               if (!msg)
+                       return -ENOMEM;
+
+               msg->l3h = msg->tail;
+               rc = recvfrom(ofd->fd, msg->tail, msgb_tailroom(msg), 0, 
(struct sockaddr *)&remote, &remote_len);
+               if (rc <= 0)
+                       return -EIO;
+               msgb_put(msg, rc);
+
+               OSMO_ASSERT(ep->rx_msg);
+
+               /* This may be a bundle of PFCP messages. Parse and receive 
each message received, by shifting l4h
+                * through the message bundle. */
+               msg->l4h = msg->l3h;
+               while (msgb_l4len(msg)) {
+                       struct osmo_gtlv_load tlv;
+                       struct osmo_pfcp_msg *m = 
osmo_pfcp_msg_alloc_rx(OTC_SELECT, &remote);
+                       m->encoded = msg;
+
+                       rc = osmo_pfcp_msg_decode_header(&tlv, m, msg);
+                       if (rc < 0)
+                               break;
+                       msg->l4h += rc;
+
+                       rc = osmo_pfcp_msg_decode_tlv(m, &tlv);
+                       /* If errors occured, they have already been logged on 
DLPFCP. */
+                       if (rc == 0)
+                               osmo_pfcp_endpoint_handle_rx(ep, m);
+                       osmo_pfcp_msg_free(m);
+               }
+               msgb_free(msg);
+       }
+       return 0;
+}
+
+/*! bind a PFCP endpoint to its configured address (ep->cfg.local_addr).
+ * \return 0 on success, negative on error. */
+int osmo_pfcp_endpoint_bind(struct osmo_pfcp_endpoint *ep)
+{
+       int rc;
+       /* close the existing socket, if any */
+       osmo_pfcp_endpoint_close(ep);
+
+       if (!ep->rx_msg) {
+               LOGP(DLPFCP, LOGL_ERROR, "missing rx_msg cb at 
osmo_pfcp_endpoint\n");
+               return -EINVAL;
+       }
+
+       /* create the new socket, binding to configured local address */
+       ep->pfcp_fd.cb = osmo_pfcp_fd_cb;
+       ep->pfcp_fd.data = ep;
+       rc = osmo_sock_init_osa_ofd(&ep->pfcp_fd, SOCK_DGRAM, IPPROTO_UDP, 
&ep->cfg.local_addr, NULL, OSMO_SOCK_F_BIND);
+       if (rc < 0)
+               return rc;
+       return 0;
+}
+
+void osmo_pfcp_endpoint_close(struct osmo_pfcp_endpoint *ep)
+{
+       struct osmo_pfcp_queue_entry *qe;
+       while ((qe = llist_first_entry_or_null(&ep->retrans_queue, struct 
osmo_pfcp_queue_entry, entry))) {
+               osmo_pfcp_queue_del(qe);
+       }
+
+       if (ep->pfcp_fd.fd != -1) {
+               osmo_fd_unregister(&ep->pfcp_fd);
+               close(ep->pfcp_fd.fd);
+               ep->pfcp_fd.fd = -1;
+       }
+}
+
+void osmo_pfcp_endpoint_free(struct osmo_pfcp_endpoint **ep)
+{
+       if (!*ep)
+               return;
+       osmo_pfcp_endpoint_close(*ep);
+       talloc_free(*ep);
+       *ep = NULL;
+}
+
+/* Call osmo_pfcp_msg_invalidate_ctx(deleted_fi) on all queued osmo_pfcp_msg 
instances in the retrans_queue. */
+void osmo_pfcp_endpoint_invalidate_ctx(struct osmo_pfcp_endpoint *ep, struct 
osmo_fsm_inst *deleted_fi)
+{
+       struct osmo_pfcp_queue_entry *qe;
+       llist_for_each_entry(qe, &ep->retrans_queue, entry) {
+               osmo_pfcp_msg_invalidate_ctx(qe->m, deleted_fi);
+       }
+}

--
To view, visit https://gerrit.osmocom.org/c/osmo-upf/+/28244
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings

Gerrit-Project: osmo-upf
Gerrit-Branch: master
Gerrit-Change-Id: Ic8d42e201b63064a71b40ca45a5a40e29941e8ac
Gerrit-Change-Number: 28244
Gerrit-PatchSet: 1
Gerrit-Owner: neels <[email protected]>
Gerrit-MessageType: newchange

Reply via email to