This is an automated email from the ASF dual-hosted git repository.

xiaoxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nuttx.git

commit 5a82e21edba480a188feea1f5f063c33040cad55
Author: xuxingliang <xuxingli...@xiaomi.com>
AuthorDate: Mon Sep 16 20:42:45 2024 +0800

    tools/gdb: use iterator for list
    
    Signed-off-by: xuxingliang <xuxingli...@xiaomi.com>
---
 tools/gdb/lists.py   | 156 +++++++++++++++++++++++++++++++++------------------
 tools/gdb/memdump.py |   8 +--
 tools/gdb/net.py     |  30 +++++-----
 tools/gdb/utils.py   |  13 +++--
 4 files changed, 126 insertions(+), 81 deletions(-)

diff --git a/tools/gdb/lists.py b/tools/gdb/lists.py
index bb0967199c..a6ade107e1 100644
--- a/tools/gdb/lists.py
+++ b/tools/gdb/lists.py
@@ -20,6 +20,8 @@
 #
 ############################################################################
 
+import argparse
+
 import gdb
 import utils
 
@@ -28,31 +30,95 @@ sq_queue_type = utils.lookup_type("sq_queue_t")
 dq_queue_type = utils.lookup_type("dq_queue_t")
 
 
-def list_for_each(head):
-    """Iterate over a list"""
-    if head.type == list_node_type.pointer():
-        head = head.dereference()
-    elif head.type != list_node_type:
-        raise TypeError("Must be struct list_node not {}".format(head.type))
+class NxList:
+    def __init__(self, list, container_type=None, member=None, reverse=False):
+        """Initialize the list iterator. Optionally specify the container type 
and member name."""
 
-    if head["next"] == 0:
-        gdb.write(
-            "list_for_each: Uninitialized list '{}' treated as empty\n".format(
-                head.address
-            )
+        if not list:
+            raise ValueError("The head cannot be None.\n")
+
+        if list.type.code != gdb.TYPE_CODE_PTR:
+            list = list.address  # Make sure list is a pointer.
+
+        if container_type and not member:
+            raise ValueError("Must specify the member name in container.\n")
+
+        self.list = list
+        self.reverse = reverse
+        self.container_type = container_type
+        self.member = member
+        self.current = self._get_first()
+
+    def _get_first(self):
+        """Get the initial node based on the direction of traversal."""
+
+        prev = self.list["prev"]
+        next = self.list["next"]
+
+        first = prev if self.reverse else next
+        return first if first and first != self.list else None
+
+    def _get_next(self, node):
+        #   for(node = (list)->next; node != (list); node = node->next)
+        return node["next"] if node["next"] != self.list else None
+
+    def _get_prev(self, node):
+        #   for(node = (list)->next; node != (list); node = node->prev)
+        return node["prev"] if node["prev"] != self.list else None
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        if self.current is None:
+            raise StopIteration
+
+        node = self.current
+        self.current = self._get_prev(node) if self.reverse else 
self._get_next(node)
+        return (
+            utils.container_of(node, self.container_type, self.member)
+            if self.container_type
+            else node
         )
-        return
 
-    node = head["next"].dereference()
-    while node.address != head.address:
-        yield node.address
-        node = node["next"].dereference()
 
+class NxSQueue(NxList):
+    def __init__(self, list, container_type=None, member=None, reverse=False):
+        """Initialize the singly linked list iterator. Optionally specify the 
container type and member name."""
+        if reverse:
+            raise ValueError(
+                "Reverse iteration is not supported for singly linked lists.\n"
+            )
+        super().__init__(list, container_type, member, reverse)
+
+    def _get_first(self):
+        #   for ((p) = (q)->head; (p) != NULL; (p) = (p)->flink)
+        return self.list["head"] or None
+
+    def _get_next(self, node):
+        # if not node["flink"], then return None, to indicate end of list
+        return node["flink"] or None
+
+
+class NxDQueue(NxList):
+    def __init__(self, list, container_type=None, member=None, reverse=False):
+        """Initialize the doubly linked list iterator. Optionally specify the 
container type and member name."""
+        super().__init__(list, container_type, member, reverse)
+
+    def _get_first(self):
+        head = self.list["head"]
+        tail = self.list["tail"]
 
-def list_for_each_entry(head, gdbtype, member):
-    """Iterate over a list of structs"""
-    for node in list_for_each(head):
-        yield utils.container_of(node, gdbtype, member)
+        first = head if not self.reverse else tail
+        return first or None
+
+    def _get_next(self, node):
+        #   for ((p) = (q)->head; (p) != NULL; (p) = (p)->flink)
+        return node["flink"] or None
+
+    def _get_prev(self, node):
+        #   for ((p) = (q)->tail; (p) != NULL; (p) = (p)->blink)
+        return node["blink"] or None
 
 
 def list_check(head):
@@ -121,25 +187,6 @@ def list_check(head):
             return
 
 
-def sq_for_every(sq, entry=None):
-    """Iterate over a singly linked list from the head or specified entry"""
-    if sq.type == sq_queue_type.pointer():
-        sq = sq.dereference()
-    elif sq.type != sq_queue_type:
-        gdb.write("Must be struct sq_queue not {}".format(sq.type))
-        return
-
-    if sq["head"] == 0:
-        return
-
-    if not entry:
-        entry = sq["head"].dereference()
-
-    while entry.address:
-        yield entry.address
-        entry = entry["flink"].dereference()
-
-
 def sq_is_empty(sq):
     """Check if a singly linked list is empty"""
     if sq.type == sq_queue_type.pointer():
@@ -252,21 +299,20 @@ class ForeachListEntry(gdb.Command):
     def invoke(self, arg, from_tty):
         argv = gdb.string_to_argv(arg)
 
-        if len(argv) != 3:
-            gdb.write(
-                "list_for_every_entry takes three arguments" "head, type, 
member\n"
-            )
-            gdb.write("eg: list_for_every_entry &g_list 'struct type' 'node 
'\n")
+        parser = argparse.ArgumentParser(description="Iterate the items in 
list")
+        parser.add_argument("head", type=str, help="List head")
+        parser.add_argument("type", type=str, help="Container type")
+        parser.add_argument("member", type=str, help="Member name in 
container")
+        try:
+            args = parser.parse_args(argv)
+        except SystemExit:
+            gdb.write("Invalid arguments\n")
             return
 
-        i = 0
-        for entry in list_for_each_entry(
-            gdb.parse_and_eval(argv[0]), gdb.lookup_type(argv[1]).pointer(), 
argv[2]
-        ):
-            gdb.write(f"{i}: ({argv[1]} *){entry}\n")
-            gdb.execute(f"print *({argv[1]} *){entry}")
-            i += 1
-
-
-ListCheck()
-ForeachListEntry()
+        pointer = gdb.parse_and_eval(args.head)
+        container_type = gdb.lookup_type(args.type)
+        member = args.member
+        list = NxList(pointer, container_type, member)
+        for i, entry in enumerate(list):
+            entry = entry.dereference()
+            gdb.write(f"{i}: {entry.format_string(styling=True)}\n")
diff --git a/tools/gdb/memdump.py b/tools/gdb/memdump.py
index 1aa1ec7d5a..483f4ad295 100644
--- a/tools/gdb/memdump.py
+++ b/tools/gdb/memdump.py
@@ -26,7 +26,7 @@ import time
 
 import gdb
 import utils
-from lists import sq_for_every
+from lists import NxSQueue
 from utils import get_long_type, get_symbol_value, lookup_type, read_ulong
 
 MM_ALLOC_BIT = 0x1
@@ -227,7 +227,7 @@ def mempool_foreach(pool):
             yield buf
             nblk -= 1
 
-    for entry in sq_for_every(pool["equeue"]):
+    for entry in NxSQueue(pool["equeue"]):
         nblk = (pool["expandsize"] - sq_entry_type.sizeof) / blocksize
         base = int(entry) - nblk * blocksize
         while nblk > 0:
@@ -354,12 +354,12 @@ class Memdump(gdb.Command):
         """Dump the mempool memory"""
         for pool in mempool_multiple_foreach(mpool):
             if pid == PID_MM_FREE:
-                for entry in sq_for_every(pool["queue"]):
+                for entry in NxSQueue(pool["queue"]):
                     gdb.write("%12u%#*x\n" % (pool["blocksize"], self.align, 
entry))
                     self.aordblks += 1
                     self.uordblks += mempool_realblocksize(pool)
 
-                for entry in sq_for_every(pool["iqueue"]):
+                for entry in NxSQueue(pool["iqueue"]):
                     gdb.write("%12u%#*x\n" % (pool["blocksize"], self.align, 
entry))
                     self.aordblks += 1
                     self.uordblks += mempool_realblocksize(pool)
diff --git a/tools/gdb/net.py b/tools/gdb/net.py
index 424107a2fb..4db70c01ab 100644
--- a/tools/gdb/net.py
+++ b/tools/gdb/net.py
@@ -20,7 +20,7 @@
 
 import gdb
 import utils
-from lists import dq_for_every, sq_for_every
+from lists import NxDQueue, NxSQueue
 
 socket = utils.import_check(
     "socket", errmsg="No socket module found, please try gdb-multiarch 
instead.\n"
@@ -66,29 +66,25 @@ def socket_for_each_entry(proto):
         readahead = conn["readahead"]
     """
 
-    sock_gdbtype = gdb.lookup_type("struct socket_conn_s").pointer()
-    conn_gdbtype = gdb.lookup_type("struct %s_conn_s" % proto).pointer()
-
-    for node in dq_for_every(gdb.parse_and_eval("g_active_%s_connections" % 
proto)):
+    g_active_connections = gdb.parse_and_eval("g_active_%s_connections" % 
proto)
+    for node in NxDQueue(g_active_connections, "struct socket_conn_s", "node"):
+        # udp_conn_s::socket_conn_s sconn
         yield utils.container_of(
-            utils.container_of(
-                node, sock_gdbtype, "node"
-            ),  # struct socket_conn_s::dq_entry_t node
-            conn_gdbtype,
+            node,
+            "struct %s_conn_s" % proto,
             "sconn",
-        )  # udp_conn_s::socket_conn_s sconn
+        )
 
 
 def wrbuffer_inqueue_size(queue=None, protocol="tcp"):
     """Calculate the total size of all iob in the write queue of a udp 
connection"""
 
-    total = 0
-    if queue:
-        wrb_gdbtype = gdb.lookup_type("struct %s_wrbuffer_s" % 
protocol).pointer()
-        for entry in sq_for_every(queue):
-            entry = utils.container_of(entry, wrb_gdbtype, "wb_node")
-            total += entry["wb_iob"]["io_pktlen"]
-    return total
+    if not queue:
+        return 0
+
+    type = "struct %s_wrbuffer_s" % protocol
+    node = "wb_node"
+    return sum(entry["wb_iob"]["io_pktlen"] for entry in NxSQueue(queue, type, 
node))
 
 
 def tcp_ofoseg_bufsize(conn):
diff --git a/tools/gdb/utils.py b/tools/gdb/utils.py
index 171e6e8afe..95378475e6 100644
--- a/tools/gdb/utils.py
+++ b/tools/gdb/utils.py
@@ -81,15 +81,18 @@ def get_long_type():
     return long_type
 
 
-def offset_of(typeobj, field):
+def offset_of(typeobj: gdb.Type, field: str) -> Union[int, None]:
     """Return the offset of a field in a structure"""
-    element = gdb.Value(0).cast(typeobj)
-    return int(str(element[field].address).split()[0], 16)
+    for f in typeobj.fields():
+        if f.name == field:
+            return f.bitpos // 8 if f.bitpos is not None else None
 
+    return None
 
-def container_of(ptr, typeobj, member):
+
+def container_of(ptr: gdb.Value, typeobj: gdb.Type, member: str) -> gdb.Value:
     """Return pointer to containing data structure"""
-    return (ptr.cast(get_long_type()) - offset_of(typeobj, 
member)).cast(typeobj)
+    return gdb.Value(ptr.address - offset_of(typeobj, 
member)).cast(typeobj.pointer())
 
 
 class ContainerOf(gdb.Function):

Reply via email to