Replace variable length array with refactored loop so that packets are processed in bursts of up to 32.
Signed-off-by: Stephen Hemminger <[email protected]> Acked-by: Bruce Richardson <[email protected]> Acked-by: Khadem Ullah <[email protected]> --- lib/pdump/meson.build | 2 -- lib/pdump/rte_pdump.c | 59 ++++++++++++++++++++++++++++--------------- 2 files changed, 39 insertions(+), 22 deletions(-) diff --git a/lib/pdump/meson.build b/lib/pdump/meson.build index 553dfdd5e6..da8d51b616 100644 --- a/lib/pdump/meson.build +++ b/lib/pdump/meson.build @@ -7,8 +7,6 @@ if is_windows subdir_done() endif -cflags += no_wvla_cflag - sources = files('rte_pdump.c') headers = files('rte_pdump.h') deps += ['ethdev', 'bpf', 'pcapng'] diff --git a/lib/pdump/rte_pdump.c b/lib/pdump/rte_pdump.c index 305325847c..33914d8745 100644 --- a/lib/pdump/rte_pdump.c +++ b/lib/pdump/rte_pdump.c @@ -27,6 +27,8 @@ RTE_LOG_REGISTER_DEFAULT(pdump_logtype, NOTICE); /* Used for the multi-process communication */ #define PDUMP_MP "mp_pdump" +#define PDUMP_BURST_SIZE 32 + /* Overly generous timeout for secondary to respond */ #define MP_TIMEOUT_S 5 @@ -135,27 +137,22 @@ pdump_cb_release(struct pdump_rxtx_cbs *cbs) /* Create a clone of mbuf to be placed into ring. */ static void -pdump_copy(uint16_t port_id, uint16_t queue, - enum rte_pcapng_direction direction, - struct rte_mbuf **pkts, uint16_t nb_pkts, - const struct pdump_rxtx_cbs *cbs, - struct rte_pdump_stats *stats) +pdump_copy_burst(uint16_t port_id, uint16_t queue_id, + enum rte_pcapng_direction direction, + struct rte_mbuf **pkts, uint16_t nb_pkts, + const struct pdump_rxtx_cbs *cbs, + struct rte_pdump_stats *stats) { - unsigned int i; - int ring_enq; - uint16_t d_pkts = 0; - struct rte_mbuf *dup_bufs[nb_pkts]; - struct rte_ring *ring; - struct rte_mempool *mp; - struct rte_mbuf *p; - uint64_t rcs[nb_pkts]; + uint16_t d_pkts = 0; /* number of duplicated packets */ + struct rte_mbuf *dup_bufs[PDUMP_BURST_SIZE]; /* duplicated packets */ + uint64_t rcs[PDUMP_BURST_SIZE]; /* filter result */ + + RTE_ASSERT(nb_pkts <= PDUMP_BURST_SIZE); if (cbs->filter) rte_bpf_exec_burst(cbs->filter, (void **)pkts, rcs, nb_pkts); - ring = cbs->ring; - mp = cbs->mp; - for (i = 0; i < nb_pkts; i++) { + for (unsigned int i = 0; i < nb_pkts; i++) { /* * This uses same BPF return value convention as socket filter * and pcap_offline_filter. @@ -172,12 +169,12 @@ pdump_copy(uint16_t port_id, uint16_t queue, * If using pcapng then want to wrap packets * otherwise a simple copy. */ + struct rte_mbuf *p; if (cbs->ver == V2) - p = rte_pcapng_copy(port_id, queue, - pkts[i], mp, cbs->snaplen, + p = rte_pcapng_copy(port_id, queue_id, pkts[i], cbs->mp, cbs->snaplen, direction, NULL); else - p = rte_pktmbuf_copy(pkts[i], mp, 0, cbs->snaplen); + p = rte_pktmbuf_copy(pkts[i], cbs->mp, 0, cbs->snaplen); if (unlikely(p == NULL)) rte_atomic_fetch_add_explicit(&stats->nombuf, 1, rte_memory_order_relaxed); @@ -185,9 +182,13 @@ pdump_copy(uint16_t port_id, uint16_t queue, dup_bufs[d_pkts++] = p; } + if (d_pkts == 0) + return; + rte_atomic_fetch_add_explicit(&stats->accepted, d_pkts, rte_memory_order_relaxed); - ring_enq = rte_ring_enqueue_burst(ring, (void *)&dup_bufs[0], d_pkts, NULL); + unsigned int ring_enq + = rte_ring_enqueue_burst(cbs->ring, (void *)&dup_bufs[0], d_pkts, NULL); if (unlikely(ring_enq < d_pkts)) { unsigned int drops = d_pkts - ring_enq; @@ -196,6 +197,24 @@ pdump_copy(uint16_t port_id, uint16_t queue, } } +/* Create a clone of mbuf to be placed into ring. */ +static void +pdump_copy(uint16_t port_id, uint16_t queue_id, + enum rte_pcapng_direction direction, + struct rte_mbuf **pkts, uint16_t nb_pkts, + const struct pdump_rxtx_cbs *cbs, + struct rte_pdump_stats *stats) +{ + uint16_t offs = 0; + + do { + uint16_t n = RTE_MIN(nb_pkts - offs, PDUMP_BURST_SIZE); + + pdump_copy_burst(port_id, queue_id, direction, &pkts[offs], n, cbs, stats); + offs += n; + } while (offs < nb_pkts); +} + static uint16_t pdump_rx(uint16_t port, uint16_t queue, struct rte_mbuf **pkts, uint16_t nb_pkts, -- 2.51.0

