Add async flow API mode to test-flow-perf application for improved flow rule insertion performance. The async API allows batching flow rule creation operations and processing completions in bulk, reducing per-rule overhead.
New command line options: --async: enable async flow API mode --async-queue-size=N: size of async queues (default: 1024) --async-push-batch=N: flows to batch before push (default: 256) Signed-off-by: Maxime Peim <[email protected]> --- app/test-flow-perf/actions_gen.c | 172 +++++++++++++ app/test-flow-perf/actions_gen.h | 4 + app/test-flow-perf/async_flow.c | 239 ++++++++++++++++++ app/test-flow-perf/async_flow.h | 41 ++++ app/test-flow-perf/items_gen.c | 13 + app/test-flow-perf/items_gen.h | 4 + app/test-flow-perf/main.c | 410 ++++++++++++++++++++++++------- app/test-flow-perf/meson.build | 1 + 8 files changed, 798 insertions(+), 86 deletions(-) create mode 100644 app/test-flow-perf/async_flow.c create mode 100644 app/test-flow-perf/async_flow.h diff --git a/app/test-flow-perf/actions_gen.c b/app/test-flow-perf/actions_gen.c index 9d102e3af4..af5ed2b30a 100644 --- a/app/test-flow-perf/actions_gen.c +++ b/app/test-flow-perf/actions_gen.c @@ -1165,3 +1165,175 @@ fill_actions(struct rte_flow_action *actions, uint64_t *flow_actions, free(queues); free(hairpin_queues); } + +void +fill_actions_template(struct rte_flow_action *actions, struct rte_flow_action *masks, + uint64_t *flow_actions, bool *need_wire_orig_table) +{ + uint8_t actions_counter = 0; + uint8_t i, j; + + *need_wire_orig_table = false; + + /* Static configurations for actions that need them in templates */ + static struct rte_flow_action_mark mark_conf = { + .id = 1, + }; + static struct rte_flow_action_queue queue_conf = { + .index = 0, + }; + static struct rte_flow_action_port_id port_id_conf = { + .id = 0, + }; + static struct rte_flow_action_jump jump_conf = { + .group = 1, + }; + static struct rte_flow_action_modify_field set_meta_conf = { + .operation = RTE_FLOW_MODIFY_SET, + .dst = + { + .field = RTE_FLOW_FIELD_META, + }, + .src = + { + .field = RTE_FLOW_FIELD_VALUE, + .value = {0, 0, 0, META_DATA}, + }, + .width = 32, + }; + + /* Static mask configurations for each action type */ + static struct rte_flow_action_mark mark_mask = { + .id = UINT32_MAX, + }; + static struct rte_flow_action_queue queue_mask = { + .index = UINT16_MAX, + }; + static struct rte_flow_action_jump jump_mask = { + .group = UINT32_MAX, + }; + static struct rte_flow_action_rss rss_mask = { + .level = UINT32_MAX, + .types = UINT64_MAX, + }; + static struct rte_flow_action_set_meta set_meta_mask = { + .data = UINT32_MAX, + .mask = UINT32_MAX, + }; + static struct rte_flow_action_set_tag set_tag_mask = { + .data = UINT32_MAX, + .mask = UINT32_MAX, + .index = UINT8_MAX, + }; + static struct rte_flow_action_port_id port_id_mask = { + .id = UINT32_MAX, + }; + static struct rte_flow_action_count count_mask; + static struct rte_flow_action_set_mac set_mac_mask = { + .mac_addr = {0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, + }; + static struct rte_flow_action_set_ipv4 set_ipv4_mask = { + .ipv4_addr = UINT32_MAX, + }; + static struct rte_flow_action_set_ipv6 set_ipv6_mask; + static struct rte_flow_action_set_tp set_tp_mask = { + .port = UINT16_MAX, + }; + static rte_be32_t tcp_seq_ack_mask = UINT32_MAX; + static struct rte_flow_action_set_ttl set_ttl_mask = { + .ttl_value = UINT8_MAX, + }; + static struct rte_flow_action_set_dscp set_dscp_mask = { + .dscp = UINT8_MAX, + }; + static struct rte_flow_action_meter meter_mask = { + .mtr_id = UINT32_MAX, + }; + + /* Initialize ipv6 mask */ + memset(set_ipv6_mask.ipv6_addr.a, 0xff, 16); + + static const struct { + uint64_t flow_mask; + enum rte_flow_action_type type; + const void *action_conf; + const void *action_mask; + const bool need_wire_orig_table; + } template_actions[] = { + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_MARK), RTE_FLOW_ACTION_TYPE_MARK, &mark_conf, + &mark_mask, true}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_COUNT), RTE_FLOW_ACTION_TYPE_COUNT, NULL, + &count_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_MODIFY_FIELD), + RTE_FLOW_ACTION_TYPE_MODIFY_FIELD, &set_meta_conf, &set_meta_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TAG), RTE_FLOW_ACTION_TYPE_SET_TAG, NULL, + &set_tag_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_FLAG), RTE_FLOW_ACTION_TYPE_FLAG, NULL, NULL, + false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_MAC_SRC), + RTE_FLOW_ACTION_TYPE_SET_MAC_SRC, NULL, &set_mac_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_MAC_DST), + RTE_FLOW_ACTION_TYPE_SET_MAC_DST, NULL, &set_mac_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC), + RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC, NULL, &set_ipv4_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV4_DST), + RTE_FLOW_ACTION_TYPE_SET_IPV4_DST, NULL, &set_ipv4_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC), + RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC, NULL, &set_ipv6_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV6_DST), + RTE_FLOW_ACTION_TYPE_SET_IPV6_DST, NULL, &set_ipv6_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TP_SRC), RTE_FLOW_ACTION_TYPE_SET_TP_SRC, + NULL, &set_tp_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TP_DST), RTE_FLOW_ACTION_TYPE_SET_TP_DST, + NULL, &set_tp_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_INC_TCP_ACK), + RTE_FLOW_ACTION_TYPE_INC_TCP_ACK, NULL, &tcp_seq_ack_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK), + RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK, NULL, &tcp_seq_ack_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ), + RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ, NULL, &tcp_seq_ack_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ), + RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ, NULL, &tcp_seq_ack_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TTL), RTE_FLOW_ACTION_TYPE_SET_TTL, NULL, + &set_ttl_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DEC_TTL), RTE_FLOW_ACTION_TYPE_DEC_TTL, NULL, + NULL, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP), + RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP, NULL, &set_dscp_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP), + RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP, NULL, &set_dscp_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_QUEUE), RTE_FLOW_ACTION_TYPE_QUEUE, + &queue_conf, &queue_mask, true}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_RSS), RTE_FLOW_ACTION_TYPE_RSS, NULL, + &rss_mask, true}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_JUMP), RTE_FLOW_ACTION_TYPE_JUMP, &jump_conf, + &jump_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_PORT_ID), RTE_FLOW_ACTION_TYPE_PORT_ID, + &port_id_conf, &port_id_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DROP), RTE_FLOW_ACTION_TYPE_DROP, NULL, NULL, + false}, + {HAIRPIN_QUEUE_ACTION, RTE_FLOW_ACTION_TYPE_QUEUE, &queue_conf, &queue_mask, false}, + {HAIRPIN_RSS_ACTION, RTE_FLOW_ACTION_TYPE_RSS, NULL, &rss_mask, false}, + {FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_METER), RTE_FLOW_ACTION_TYPE_METER, NULL, + &meter_mask, false}, + }; + + for (j = 0; j < MAX_ACTIONS_NUM; j++) { + if (flow_actions[j] == 0) + break; + for (i = 0; i < RTE_DIM(template_actions); i++) { + if ((flow_actions[j] & template_actions[i].flow_mask) == 0) + continue; + actions[actions_counter].type = template_actions[i].type; + actions[actions_counter].conf = template_actions[i].action_conf; + masks[actions_counter].type = template_actions[i].type; + masks[actions_counter].conf = template_actions[i].action_mask; + *need_wire_orig_table |= template_actions[i].need_wire_orig_table; + actions_counter++; + break; + } + } + + actions[actions_counter].type = RTE_FLOW_ACTION_TYPE_END; + masks[actions_counter].type = RTE_FLOW_ACTION_TYPE_END; +} diff --git a/app/test-flow-perf/actions_gen.h b/app/test-flow-perf/actions_gen.h index 9e13b164f9..7450d45ef7 100644 --- a/app/test-flow-perf/actions_gen.h +++ b/app/test-flow-perf/actions_gen.h @@ -22,4 +22,8 @@ void fill_actions(struct rte_flow_action *actions, uint64_t *flow_actions, uint64_t encap_data, uint64_t decap_data, uint8_t core_idx, bool unique_data, uint8_t rx_queues_count, uint16_t dst_port); +/* Fill actions template for async flow API (types only, no values) */ +void fill_actions_template(struct rte_flow_action *actions, struct rte_flow_action *masks, + uint64_t *flow_actions, bool *need_wire_orig_table); + #endif /* FLOW_PERF_ACTION_GEN */ diff --git a/app/test-flow-perf/async_flow.c b/app/test-flow-perf/async_flow.c new file mode 100644 index 0000000000..ba12012c85 --- /dev/null +++ b/app/test-flow-perf/async_flow.c @@ -0,0 +1,239 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright 2026 Mellanox Technologies, Ltd + * + * This file contains the async flow API implementation + * for the flow-perf application. + */ + +#include <stdio.h> +#include <string.h> + +#include <rte_ethdev.h> +#include <rte_flow.h> +#include <rte_malloc.h> + +#include "actions_gen.h" +#include "async_flow.h" +#include "flow_gen.h" +#include "items_gen.h" + +/* Per-port async flow resources */ +static struct async_flow_resources port_resources[MAX_PORTS]; + +int +async_flow_init_port(uint16_t port_id, uint32_t nb_queues, uint32_t queue_size, + uint64_t *flow_items, uint64_t *flow_actions, uint64_t *flow_attrs, + uint8_t flow_group, uint32_t rules_count) +{ + struct rte_flow_port_info port_info = {0}; + struct rte_flow_queue_info queue_info = {0}; + struct rte_flow_error error = {0}; + struct rte_flow_port_attr port_attr = {0}; + struct rte_flow_queue_attr *queue_attr = alloca(sizeof(struct rte_flow_queue_attr)); + const struct rte_flow_queue_attr **queue_attr_list = + alloca(sizeof(struct rte_flow_queue_attr) * nb_queues); + struct rte_flow_pattern_template_attr pt_attr = {0}; + struct rte_flow_actions_template_attr at_attr = {0}; + struct rte_flow_template_table_attr table_attr = {0}; + struct rte_flow_item pattern[MAX_ITEMS_NUM]; + struct rte_flow_action actions[MAX_ACTIONS_NUM]; + struct rte_flow_action action_masks[MAX_ACTIONS_NUM]; + struct async_flow_resources *res; + bool need_wire_orig_table = false; + uint32_t i; + int ret; + + if (port_id >= MAX_PORTS) + return -1; + + res = &port_resources[port_id]; + memset(res, 0, sizeof(*res)); + + /* Query port flow info */ + ret = rte_flow_info_get(port_id, &port_info, &queue_info, &error); + if (ret != 0) { + fprintf(stderr, "Port %u: rte_flow_info_get failed: %s\n", port_id, + error.message ? error.message : "(no message)"); + return ret; + } + + /* Limit to device capabilities if reported */ + if (port_info.max_nb_queues != 0 && port_info.max_nb_queues != UINT32_MAX && + nb_queues > port_info.max_nb_queues) + nb_queues = port_info.max_nb_queues; + if (queue_info.max_size != 0 && queue_info.max_size != UINT32_MAX && + queue_size > queue_info.max_size) + queue_size = queue_info.max_size; + + queue_attr->size = queue_size; + for (i = 0; i < nb_queues; i++) + queue_attr_list[i] = queue_attr; + + ret = rte_flow_configure(port_id, &port_attr, nb_queues, queue_attr_list, &error); + if (ret != 0) { + fprintf(stderr, "Port %u: rte_flow_configure failed (ret=%d, type=%d): %s\n", + port_id, ret, error.type, error.message ? error.message : "(no message)"); + return ret; + } + + /* Create pattern template */ + for (i = 0; i < MAX_ATTRS_NUM; i++) { + if (flow_attrs[i] == 0) + break; + if (flow_attrs[i] & INGRESS) + pt_attr.ingress = 1; + else if (flow_attrs[i] & EGRESS) + pt_attr.egress = 1; + else if (flow_attrs[i] & TRANSFER) + pt_attr.transfer = 1; + } + /* Enable relaxed matching for better performance */ + pt_attr.relaxed_matching = 1; + + memset(pattern, 0, sizeof(pattern)); + memset(actions, 0, sizeof(actions)); + memset(action_masks, 0, sizeof(action_masks)); + + fill_items_template(pattern, flow_items, 0, 0); + + res->pattern_template = + rte_flow_pattern_template_create(port_id, &pt_attr, pattern, &error); + if (res->pattern_template == NULL) { + fprintf(stderr, "Port %u: pattern template create failed: %s\n", port_id, + error.message ? error.message : "(no message)"); + return -1; + } + + /* Create actions template */ + at_attr.ingress = pt_attr.ingress; + at_attr.egress = pt_attr.egress; + at_attr.transfer = pt_attr.transfer; + + fill_actions_template(actions, action_masks, flow_actions, &need_wire_orig_table); + + res->actions_template = + rte_flow_actions_template_create(port_id, &at_attr, actions, action_masks, &error); + if (res->actions_template == NULL) { + fprintf(stderr, "Port %u: actions template create failed: %s\n", port_id, + error.message ? error.message : "(no message)"); + rte_flow_pattern_template_destroy(port_id, res->pattern_template, &error); + res->pattern_template = NULL; + return -1; + } + + /* Create template table */ + table_attr.flow_attr.group = flow_group; + table_attr.flow_attr.priority = 0; + table_attr.flow_attr.ingress = pt_attr.ingress; + table_attr.flow_attr.egress = pt_attr.egress; + table_attr.flow_attr.transfer = pt_attr.transfer; + table_attr.nb_flows = rules_count; + + if (pt_attr.transfer && need_wire_orig_table) + table_attr.specialize = RTE_FLOW_TABLE_SPECIALIZE_TRANSFER_WIRE_ORIG; + + res->table = rte_flow_template_table_create(port_id, &table_attr, &res->pattern_template, 1, + &res->actions_template, 1, &error); + if (res->table == NULL) { + fprintf(stderr, "Port %u: template table create failed: %s\n", port_id, + error.message ? error.message : "(no message)"); + rte_flow_actions_template_destroy(port_id, res->actions_template, &error); + rte_flow_pattern_template_destroy(port_id, res->pattern_template, &error); + res->pattern_template = NULL; + res->actions_template = NULL; + return -1; + } + + res->table_capacity = rules_count; + res->initialized = true; + + printf("Port %u: Async flow engine initialized (queues=%u, queue_size=%u)\n", port_id, + nb_queues, queue_size); + + return 0; +} + +struct rte_flow * +async_generate_flow(uint16_t port_id, uint32_t queue_id, uint64_t *flow_items, + uint64_t *flow_actions, uint32_t counter, uint16_t hairpinq, + uint64_t encap_data, uint64_t decap_data, uint16_t dst_port, uint8_t core_idx, + uint8_t rx_queues_count, bool unique_data, bool postpone, + struct rte_flow_error *error) +{ + struct async_flow_resources *res; + struct rte_flow_item items[MAX_ITEMS_NUM]; + struct rte_flow_action actions[MAX_ACTIONS_NUM]; + struct rte_flow_op_attr op_attr = { + .postpone = postpone, + }; + struct rte_flow *flow; + + if (port_id >= MAX_PORTS) { + rte_flow_error_set(error, EINVAL, RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL, + "Invalid port ID"); + return NULL; + } + + res = &port_resources[port_id]; + if (!res->initialized) { + rte_flow_error_set(error, EINVAL, RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL, + "Async flow resources not initialized"); + return NULL; + } + + /* Fill pattern items with actual values */ + memset(items, 0, sizeof(items)); + fill_items(items, flow_items, counter, core_idx); + + /* Fill actions with actual values */ + memset(actions, 0, sizeof(actions)); + fill_actions(actions, flow_actions, counter, JUMP_ACTION_TABLE, hairpinq, encap_data, + decap_data, core_idx, unique_data, rx_queues_count, dst_port); + + /* Create flow asynchronously */ + flow = rte_flow_async_create(port_id, queue_id, &op_attr, res->table, items, 0, actions, 0, + NULL, error); + + return flow; +} + +void +async_flow_cleanup_port(uint16_t port_id) +{ + struct async_flow_resources *res; + struct rte_flow_error error; + struct rte_flow_op_result results[64]; + int ret, i; + + if (port_id >= MAX_PORTS) + return; + + res = &port_resources[port_id]; + if (!res->initialized) + return; + + /* Drain any pending async completions from flow flush */ + for (i = 0; i < 100; i++) { /* Max iterations to avoid infinite loop */ + rte_flow_push(port_id, 0, &error); + ret = rte_flow_pull(port_id, 0, results, 64, &error); + if (ret <= 0) + break; + } + + if (res->table != NULL) { + rte_flow_template_table_destroy(port_id, res->table, &error); + res->table = NULL; + } + + if (res->actions_template != NULL) { + rte_flow_actions_template_destroy(port_id, res->actions_template, &error); + res->actions_template = NULL; + } + + if (res->pattern_template != NULL) { + rte_flow_pattern_template_destroy(port_id, res->pattern_template, &error); + res->pattern_template = NULL; + } + + res->initialized = false; +} diff --git a/app/test-flow-perf/async_flow.h b/app/test-flow-perf/async_flow.h new file mode 100644 index 0000000000..2684fc4156 --- /dev/null +++ b/app/test-flow-perf/async_flow.h @@ -0,0 +1,41 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright 2026 Mellanox Technologies, Ltd + * + * This file contains the async flow API related definitions + * and function declarations. + */ + +#ifndef FLOW_PERF_ASYNC_FLOW +#define FLOW_PERF_ASYNC_FLOW + +#include <rte_flow.h> +#include <stdbool.h> +#include <stdint.h> + +#include "config.h" + +/* Per-port async flow resources */ +struct async_flow_resources { + struct rte_flow_pattern_template *pattern_template; + struct rte_flow_actions_template *actions_template; + struct rte_flow_template_table *table; + uint32_t table_capacity; + bool initialized; +}; + +/* Initialize async flow engine for a port */ +int async_flow_init_port(uint16_t port_id, uint32_t nb_queues, uint32_t queue_size, + uint64_t *flow_items, uint64_t *flow_actions, uint64_t *flow_attrs, + uint8_t flow_group, uint32_t rules_count); + +/* Create a flow rule asynchronously */ +struct rte_flow *async_generate_flow(uint16_t port_id, uint32_t queue_id, uint64_t *flow_items, + uint64_t *flow_actions, uint32_t counter, uint16_t hairpinq, + uint64_t encap_data, uint64_t decap_data, uint16_t dst_port, + uint8_t core_idx, uint8_t rx_queues_count, bool unique_data, + bool postpone, struct rte_flow_error *error); + +/* Cleanup async flow resources for a port */ +void async_flow_cleanup_port(uint16_t port_id); + +#endif /* FLOW_PERF_ASYNC_FLOW */ diff --git a/app/test-flow-perf/items_gen.c b/app/test-flow-perf/items_gen.c index c740e1838f..4f20175f01 100644 --- a/app/test-flow-perf/items_gen.c +++ b/app/test-flow-perf/items_gen.c @@ -389,3 +389,16 @@ fill_items(struct rte_flow_item *items, items[items_counter].type = RTE_FLOW_ITEM_TYPE_END; } + +void +fill_items_template(struct rte_flow_item *items, uint64_t *flow_items, uint32_t outer_ip_src, + uint8_t core_idx) +{ + uint8_t i; + + fill_items(items, flow_items, outer_ip_src, core_idx); + + /* For templates, set spec to NULL - only mask matters for template matching */ + for (i = 0; items[i].type != RTE_FLOW_ITEM_TYPE_END; i++) + items[i].spec = NULL; +} diff --git a/app/test-flow-perf/items_gen.h b/app/test-flow-perf/items_gen.h index f4b0e9a981..50bb4d9fd0 100644 --- a/app/test-flow-perf/items_gen.h +++ b/app/test-flow-perf/items_gen.h @@ -15,4 +15,8 @@ void fill_items(struct rte_flow_item *items, uint64_t *flow_items, uint32_t outer_ip_src, uint8_t core_idx); +/* Fill items template for async flow API (masks only, no spec values) */ +void fill_items_template(struct rte_flow_item *items, uint64_t *flow_items, uint32_t outer_ip_src, + uint8_t core_idx); + #endif /* FLOW_PERF_ITEMS_GEN */ diff --git a/app/test-flow-perf/main.c b/app/test-flow-perf/main.c index 6636d1517f..32f2260ba0 100644 --- a/app/test-flow-perf/main.c +++ b/app/test-flow-perf/main.c @@ -37,9 +37,11 @@ #include <rte_mtr.h> #include <rte_os_shim.h> -#include "config.h" #include "actions_gen.h" +#include "async_flow.h" +#include "config.h" #include "flow_gen.h" +#include "rte_common.h" #define MAX_BATCHES_COUNT 100 #define DEFAULT_RULES_COUNT 4000000 @@ -81,6 +83,9 @@ static bool enable_fwd; static bool unique_data; static bool policy_mtr; static bool packet_mode; +static bool async_mode; +static uint32_t async_queue_size = 1024; +static uint32_t async_push_batch = 256; static uint8_t rx_queues_count; static uint8_t tx_queues_count; @@ -598,6 +603,13 @@ usage(char *progname) "Encapped data is fixed with pattern: ether,ipv4,udp,vxlan\n" "With fixed values\n"); printf(" --vxlan-decap: add vxlan_decap action to flow actions\n"); + + printf("\nAsync flow API options:\n"); + printf(" --async: enable async flow API mode\n"); + printf(" --async-queue-size=N: size of each async queue," + " default is 1024\n"); + printf(" --async-push-batch=N: flows to batch before push," + " default is 256\n"); } static void @@ -655,86 +667,90 @@ args_parse(int argc, char **argv) static const struct option lgopts[] = { /* Control */ - { "help", 0, 0, 0 }, - { "rules-count", 1, 0, 0 }, - { "rules-batch", 1, 0, 0 }, - { "dump-iterations", 0, 0, 0 }, - { "deletion-rate", 0, 0, 0 }, - { "query-rate", 0, 0, 0 }, - { "dump-socket-mem", 0, 0, 0 }, - { "enable-fwd", 0, 0, 0 }, - { "unique-data", 0, 0, 0 }, - { "portmask", 1, 0, 0 }, - { "hairpin-conf", 1, 0, 0 }, - { "cores", 1, 0, 0 }, - { "random-priority", 1, 0, 0 }, - { "meter-profile-alg", 1, 0, 0 }, - { "rxq", 1, 0, 0 }, - { "txq", 1, 0, 0 }, - { "rxd", 1, 0, 0 }, - { "txd", 1, 0, 0 }, - { "mbuf-size", 1, 0, 0 }, - { "mbuf-cache-size", 1, 0, 0 }, - { "total-mbuf-count", 1, 0, 0 }, + {"help", 0, 0, 0}, + {"rules-count", 1, 0, 0}, + {"rules-batch", 1, 0, 0}, + {"dump-iterations", 0, 0, 0}, + {"deletion-rate", 0, 0, 0}, + {"query-rate", 0, 0, 0}, + {"dump-socket-mem", 0, 0, 0}, + {"enable-fwd", 0, 0, 0}, + {"unique-data", 0, 0, 0}, + {"portmask", 1, 0, 0}, + {"hairpin-conf", 1, 0, 0}, + {"cores", 1, 0, 0}, + {"random-priority", 1, 0, 0}, + {"meter-profile-alg", 1, 0, 0}, + {"rxq", 1, 0, 0}, + {"txq", 1, 0, 0}, + {"rxd", 1, 0, 0}, + {"txd", 1, 0, 0}, + {"mbuf-size", 1, 0, 0}, + {"mbuf-cache-size", 1, 0, 0}, + {"total-mbuf-count", 1, 0, 0}, /* Attributes */ - { "ingress", 0, 0, 0 }, - { "egress", 0, 0, 0 }, - { "transfer", 0, 0, 0 }, - { "group", 1, 0, 0 }, + {"ingress", 0, 0, 0}, + {"egress", 0, 0, 0}, + {"transfer", 0, 0, 0}, + {"group", 1, 0, 0}, /* Items */ - { "ether", 0, 0, 0 }, - { "vlan", 0, 0, 0 }, - { "ipv4", 0, 0, 0 }, - { "ipv6", 0, 0, 0 }, - { "tcp", 0, 0, 0 }, - { "udp", 0, 0, 0 }, - { "vxlan", 0, 0, 0 }, - { "vxlan-gpe", 0, 0, 0 }, - { "gre", 0, 0, 0 }, - { "geneve", 0, 0, 0 }, - { "gtp", 0, 0, 0 }, - { "meta", 0, 0, 0 }, - { "tag", 0, 0, 0 }, - { "icmpv4", 0, 0, 0 }, - { "icmpv6", 0, 0, 0 }, + {"ether", 0, 0, 0}, + {"vlan", 0, 0, 0}, + {"ipv4", 0, 0, 0}, + {"ipv6", 0, 0, 0}, + {"tcp", 0, 0, 0}, + {"udp", 0, 0, 0}, + {"vxlan", 0, 0, 0}, + {"vxlan-gpe", 0, 0, 0}, + {"gre", 0, 0, 0}, + {"geneve", 0, 0, 0}, + {"gtp", 0, 0, 0}, + {"meta", 0, 0, 0}, + {"tag", 0, 0, 0}, + {"icmpv4", 0, 0, 0}, + {"icmpv6", 0, 0, 0}, /* Actions */ - { "port-id", 2, 0, 0 }, - { "rss", 0, 0, 0 }, - { "queue", 0, 0, 0 }, - { "jump", 0, 0, 0 }, - { "mark", 0, 0, 0 }, - { "count", 0, 0, 0 }, - { "set-meta", 0, 0, 0 }, - { "set-tag", 0, 0, 0 }, - { "drop", 0, 0, 0 }, - { "hairpin-queue", 1, 0, 0 }, - { "hairpin-rss", 1, 0, 0 }, - { "set-src-mac", 0, 0, 0 }, - { "set-dst-mac", 0, 0, 0 }, - { "set-src-ipv4", 0, 0, 0 }, - { "set-dst-ipv4", 0, 0, 0 }, - { "set-src-ipv6", 0, 0, 0 }, - { "set-dst-ipv6", 0, 0, 0 }, - { "set-src-tp", 0, 0, 0 }, - { "set-dst-tp", 0, 0, 0 }, - { "inc-tcp-ack", 0, 0, 0 }, - { "dec-tcp-ack", 0, 0, 0 }, - { "inc-tcp-seq", 0, 0, 0 }, - { "dec-tcp-seq", 0, 0, 0 }, - { "set-ttl", 0, 0, 0 }, - { "dec-ttl", 0, 0, 0 }, - { "set-ipv4-dscp", 0, 0, 0 }, - { "set-ipv6-dscp", 0, 0, 0 }, - { "flag", 0, 0, 0 }, - { "meter", 0, 0, 0 }, - { "raw-encap", 1, 0, 0 }, - { "raw-decap", 1, 0, 0 }, - { "vxlan-encap", 0, 0, 0 }, - { "vxlan-decap", 0, 0, 0 }, - { "policy-mtr", 1, 0, 0 }, - { "meter-profile", 1, 0, 0 }, - { "packet-mode", 0, 0, 0 }, - { 0, 0, 0, 0 }, + {"port-id", 2, 0, 0}, + {"rss", 0, 0, 0}, + {"queue", 0, 0, 0}, + {"jump", 0, 0, 0}, + {"mark", 0, 0, 0}, + {"count", 0, 0, 0}, + {"set-meta", 0, 0, 0}, + {"set-tag", 0, 0, 0}, + {"drop", 0, 0, 0}, + {"hairpin-queue", 1, 0, 0}, + {"hairpin-rss", 1, 0, 0}, + {"set-src-mac", 0, 0, 0}, + {"set-dst-mac", 0, 0, 0}, + {"set-src-ipv4", 0, 0, 0}, + {"set-dst-ipv4", 0, 0, 0}, + {"set-src-ipv6", 0, 0, 0}, + {"set-dst-ipv6", 0, 0, 0}, + {"set-src-tp", 0, 0, 0}, + {"set-dst-tp", 0, 0, 0}, + {"inc-tcp-ack", 0, 0, 0}, + {"dec-tcp-ack", 0, 0, 0}, + {"inc-tcp-seq", 0, 0, 0}, + {"dec-tcp-seq", 0, 0, 0}, + {"set-ttl", 0, 0, 0}, + {"dec-ttl", 0, 0, 0}, + {"set-ipv4-dscp", 0, 0, 0}, + {"set-ipv6-dscp", 0, 0, 0}, + {"flag", 0, 0, 0}, + {"meter", 0, 0, 0}, + {"raw-encap", 1, 0, 0}, + {"raw-decap", 1, 0, 0}, + {"vxlan-encap", 0, 0, 0}, + {"vxlan-decap", 0, 0, 0}, + {"policy-mtr", 1, 0, 0}, + {"meter-profile", 1, 0, 0}, + {"packet-mode", 0, 0, 0}, + /* Async flow API options */ + {"async", 0, 0, 0}, + {"async-queue-size", 1, 0, 0}, + {"async-push-batch", 1, 0, 0}, + {0, 0, 0, 0}, }; RTE_ETH_FOREACH_DEV(i) @@ -913,14 +929,15 @@ args_parse(int argc, char **argv) rte_exit(EXIT_FAILURE, "Invalid hairpin config mask\n"); hairpin_conf_mask = hp_conf; } - if (strcmp(lgopts[opt_idx].name, - "port-id") == 0) { + if (strcmp(lgopts[opt_idx].name, "port-id") == 0) { uint16_t port_idx = 0; - token = strtok(optarg, ","); - while (token != NULL) { - dst_ports[port_idx++] = atoi(token); - token = strtok(NULL, ","); + if (optarg != NULL) { + token = strtok(optarg, ","); + while (token != NULL) { + dst_ports[port_idx++] = atoi(token); + token = strtok(NULL, ","); + } } } if (strcmp(lgopts[opt_idx].name, "rxq") == 0) { @@ -981,6 +998,22 @@ args_parse(int argc, char **argv) } if (strcmp(lgopts[opt_idx].name, "packet-mode") == 0) packet_mode = true; + if (strcmp(lgopts[opt_idx].name, "async") == 0) + async_mode = true; + if (strcmp(lgopts[opt_idx].name, "async-queue-size") == 0) { + n = atoi(optarg); + if (n > 0) + async_queue_size = n; + else + rte_exit(EXIT_FAILURE, "async-queue-size should be > 0\n"); + } + if (strcmp(lgopts[opt_idx].name, "async-push-batch") == 0) { + n = atoi(optarg); + if (n > 0) + async_push_batch = n; + else + rte_exit(EXIT_FAILURE, "async-push-batch should be > 0\n"); + } break; default: usage(argv[0]); @@ -1578,6 +1611,197 @@ insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id) return flows_list; } +static inline int +push_pull_flows_async(int port_id, int queue_id, uint64_t enqueued, uint64_t *in_flight, + bool force_push, bool force_pull, bool check_op_status, + struct rte_flow_op_result *results, struct rte_flow_error *error) +{ + /* Keep queue at most 75% full to avoid overflow */ + uint32_t max_in_flight = (async_queue_size * 3) / 4; + int pulled, i; + int ret = 0; + bool do_pull = force_pull || *in_flight >= max_in_flight; + /* If we need to pull, we want all the in fligt work to have been pushed */ + bool do_push = do_pull || force_push || (enqueued % async_push_batch) == 0; + + /* Push periodically to give HW work to do */ + if (do_push) { + ret = rte_flow_push(port_id, queue_id, error); + if (ret) + return ret; + } + + /* Check if queue is getting full, if so push and drain completions */ + if (do_pull) { + do { + pulled = rte_flow_pull(port_id, queue_id, results, async_push_batch, error); + if (pulled < 0) { + return -1; + } else if (pulled == 0) { + rte_pause(); + continue; + } + + *in_flight -= pulled; + if (!check_op_status) + continue; + + for (i = 0; i < pulled; i++) { + if (results[i].status != RTE_FLOW_OP_SUCCESS) { + rte_flow_error_set(error, EINVAL, + RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL, + "Some flow rule insertion failed"); + return -1; + } + } + } while (*in_flight >= max_in_flight); + } + + return 0; +} + +static struct rte_flow ** +insert_flows_async(int port_id, uint8_t core_id, uint16_t dst_port_id) +{ + struct rte_flow **flows_list; + struct rte_flow_error error; + struct rte_flow_op_result *results; + clock_t start_batch, end_batch; + double first_flow_latency; + double cpu_time_used; + double insertion_rate; + double cpu_time_per_batch[MAX_BATCHES_COUNT] = {0}; + double delta; + uint32_t flow_index; + uint32_t counter, start_counter = 0, end_counter; + int rules_batch_idx; + int rules_count_per_core; + uint64_t total_enqueued = 0; + uint64_t in_flight = 0; + uint32_t queue_id = core_id; + + rules_count_per_core = rules_count / mc_pool.cores_count; + + /* Set boundaries of rules for each core. */ + if (core_id) + start_counter = core_id * rules_count_per_core; + end_counter = (core_id + 1) * rules_count_per_core; + + flows_list = rte_zmalloc("flows_list", + (sizeof(struct rte_flow *) * (rules_count_per_core + 1)), 0); + if (flows_list == NULL) + rte_exit(EXIT_FAILURE, "No Memory available!\n"); + + results = rte_zmalloc("results", sizeof(struct rte_flow_op_result) * async_push_batch, 0); + if (results == NULL) { + rte_free(flows_list); + rte_exit(EXIT_FAILURE, "No Memory available!\n"); + } + + cpu_time_used = 0; + flow_index = 0; + if (flow_group > 0 && core_id == 0) { + /* + * Create global rule to jump into flow_group, + * this way the app will avoid the default rules. + * + * This rule will be created only once. + * + * Global rule: + * group 0 eth / end actions jump group <flow_group> + */ + + uint64_t global_items[MAX_ITEMS_NUM] = {0}; + uint64_t global_actions[MAX_ACTIONS_NUM] = {0}; + global_items[0] = FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_ETH); + global_actions[0] = FLOW_ITEM_MASK(RTE_FLOW_ACTION_TYPE_JUMP); + flow = generate_flow(port_id, 0, flow_attrs, global_items, global_actions, + flow_group, 0, 0, 0, 0, dst_port_id, core_id, rx_queues_count, + unique_data, max_priority, &error); + + if (flow == NULL) { + print_flow_error(error); + rte_exit(EXIT_FAILURE, "Error in creating flow\n"); + } + flows_list[flow_index++] = flow; + } + + start_batch = rte_get_timer_cycles(); + for (counter = start_counter; counter < end_counter; counter++) { + if (push_pull_flows_async(port_id, queue_id, total_enqueued, &in_flight, false, + false, false, results, &error)) { + print_flow_error(error); + rte_exit(EXIT_FAILURE, "Error push/pull async operations\n"); + } + + /* Create flow with postpone=true to batch operations */ + flow = async_generate_flow(port_id, queue_id, flow_items, flow_actions, counter, + hairpin_queues_num, encap_data, decap_data, dst_port_id, + core_id, rx_queues_count, unique_data, true, &error); + + if (counter == start_counter) { + first_flow_latency = (double)(rte_get_timer_cycles() - start_batch); + first_flow_latency /= rte_get_timer_hz(); + /* In millisecond */ + first_flow_latency *= 1000; + printf(":: First Flow Latency (Async) :: Port %d :: First flow " + "installed in %f milliseconds\n", + port_id, first_flow_latency); + } + + if (force_quit) + break; + + if (!flow) { + print_flow_error(error); + rte_exit(EXIT_FAILURE, "Error in creating async flow\n"); + } + + flows_list[flow_index++] = flow; + total_enqueued++; + in_flight++; + + /* + * Save the insertion rate for rules batch. + * Check if the insertion reached the rules + * patch counter, then save the insertion rate + * for this batch. + */ + if (!((counter + 1) % rules_batch)) { + end_batch = rte_get_timer_cycles(); + delta = (double)(end_batch - start_batch); + rules_batch_idx = ((counter + 1) / rules_batch) - 1; + cpu_time_per_batch[rules_batch_idx] = delta / rte_get_timer_hz(); + cpu_time_used += cpu_time_per_batch[rules_batch_idx]; + start_batch = rte_get_timer_cycles(); + } + } + + if (push_pull_flows_async(port_id, queue_id, total_enqueued, &in_flight, true, true, true, + results, &error)) { + print_flow_error(error); + rte_exit(EXIT_FAILURE, "Error final push/pull async operations\n"); + } + + /* Print insertion rates for all batches */ + if (dump_iterations) + print_rules_batches(cpu_time_per_batch); + + printf(":: Port %d :: Core %d boundaries (Async) :: start @[%d] - end @[%d]\n", port_id, + core_id, start_counter, end_counter - 1); + + /* Insertion rate for all rules in one core */ + insertion_rate = ((double)(rules_count_per_core / cpu_time_used) / 1000); + printf(":: Port %d :: Core %d :: Async rules insertion rate -> %f K Rule/Sec\n", port_id, + core_id, insertion_rate); + printf(":: Port %d :: Core %d :: The time for creating %d async rules is %f seconds\n", + port_id, core_id, rules_count_per_core, cpu_time_used); + + rte_free(results); + mc_pool.flows_record.insertion[port_id][core_id] = cpu_time_used; + return flows_list; +} + static void flows_handler(uint8_t core_id) { @@ -1602,8 +1826,10 @@ flows_handler(uint8_t core_id) mc_pool.last_alloc[core_id] = (int64_t)dump_socket_mem(stdout); if (has_meter()) meters_handler(port_id, core_id, METER_CREATE); - flows_list = insert_flows(port_id, core_id, - dst_ports[port_idx++]); + if (async_mode) + flows_list = insert_flows_async(port_id, core_id, dst_ports[port_idx++]); + else + flows_list = insert_flows(port_id, core_id, dst_ports[port_idx++]); if (flows_list == NULL) rte_exit(EXIT_FAILURE, "Error: Insertion Failed!\n"); mc_pool.current_alloc[core_id] = (int64_t)dump_socket_mem(stdout); @@ -2212,6 +2438,16 @@ init_port(void) } } + /* Configure async flow engine before device start */ + if (async_mode) { + ret = async_flow_init_port(port_id, mc_pool.cores_count, async_queue_size, + flow_items, flow_actions, flow_attrs, flow_group, + rules_count); + if (ret != 0) + rte_exit(EXIT_FAILURE, "Failed to init async flow on port %d\n", + port_id); + } + ret = rte_eth_dev_start(port_id); if (ret < 0) rte_exit(EXIT_FAILURE, @@ -2291,6 +2527,8 @@ main(int argc, char **argv) RTE_ETH_FOREACH_DEV(port) { rte_flow_flush(port, &error); + if (async_mode) + async_flow_cleanup_port(port); if (rte_eth_dev_stop(port) != 0) printf("Failed to stop device on port %u\n", port); rte_eth_dev_close(port); diff --git a/app/test-flow-perf/meson.build b/app/test-flow-perf/meson.build index e101449e32..2f820a7597 100644 --- a/app/test-flow-perf/meson.build +++ b/app/test-flow-perf/meson.build @@ -3,6 +3,7 @@ sources = files( 'actions_gen.c', + 'async_flow.c', 'flow_gen.c', 'items_gen.c', 'main.c', -- 2.43.0

