Module: deluge
Branch: chunked-sessionproxy-and-gtkui-speedups
Commit: 82fecd7aafa702fa0f0fb03f9193328eaf881f73

Author: Pedro Algarvio <[email protected]>
Date:   Thu Apr 28 20:05:11 2011 +0100

Implemented a "chunked" session proxy.

This "chunked" session proxy will allow a more responsive UI, for the tests 
done, the GTK UI, when one has a huge amount of torrents(>500).
What really happens is that the daemon is queried for chunks of torrent ids at 
a time, and not all torrents at the same time. Once finished, return the result.
Might probably need to also implement yielding the results, although I don't 
know if you can yield results from a deferred.

---

 deluge/ui/sessionproxy.py |  161 ++++++++++++++++++++++++++++-----------------
 1 files changed, 100 insertions(+), 61 deletions(-)

diff --git a/deluge/ui/sessionproxy.py b/deluge/ui/sessionproxy.py
index af485a4..3041630 100644
--- a/deluge/ui/sessionproxy.py
+++ b/deluge/ui/sessionproxy.py
@@ -67,26 +67,39 @@ class SessionProxy(component.Component):
         # Holds the time of the last key update.. {torrent_id: {key1, time, 
...}, ...}
         self.cache_times = {}
 
-        client.register_event_handler("TorrentStateChangedEvent", 
self.on_torrent_state_changed)
-        client.register_event_handler("TorrentRemovedEvent", 
self.on_torrent_removed)
-        client.register_event_handler("TorrentAddedEvent", 
self.on_torrent_added)
+        client.register_event_handler(
+            "TorrentStateChangedEvent", self.on_torrent_state_changed
+        )
+        client.register_event_handler(
+            "TorrentRemovedEvent", self.on_torrent_removed
+        )
+        client.register_event_handler(
+            "TorrentAddedEvent", self.on_torrent_added
+        )
 
     def start(self):
-        def on_torrent_status(status):
-            # Save the time we got the torrent status
-            t = time.time()
-            for key, value in status.iteritems():
-                self.torrents[key] = [t, value]
-                self.cache_times[key] = {}
-                for ikey in value.iterkeys():
-                    self.cache_times[key][ikey] = t
-
-        return client.core.get_torrents_status({}, [], 
True).addCallback(on_torrent_status)
+        def on_get_session_state(torrent_ids):
+            for torrent_ids_chunk in self.__get_list_in_chunks(torrent_ids, 2):
+                torrent_ids_chunk = list(torrent_ids_chunk)
+                print 'querying status for chunk', torrent_ids_chunk
+                d = client.core.get_torrents_status(
+                    {'id': torrent_ids_chunk}, [], True
+                ).addCallback(self.__on_torrents_status)
+            return d.addCallback(
+                self.create_status_dict_from_deferred, torrent_ids, []
+            )
+        return 
client.core.get_session_state().addCallback(on_get_session_state)
 
     def stop(self):
-        client.deregister_event_handler("TorrentStateChangedEvent", 
self.on_torrent_state_changed)
-        client.deregister_event_handler("TorrentRemovedEvent", 
self.on_torrent_removed)
-        client.deregister_event_handler("TorrentAddedEvent", 
self.on_torrent_added)
+        client.deregister_event_handler(
+            "TorrentStateChangedEvent", self.on_torrent_state_changed
+        )
+        client.deregister_event_handler(
+            "TorrentRemovedEvent", self.on_torrent_removed
+        )
+        client.deregister_event_handler(
+            "TorrentAddedEvent", self.on_torrent_added
+        )
         self.torrents = {}
 
     def create_status_dict(self, torrent_ids, keys):
@@ -111,9 +124,15 @@ class SessionProxy(component.Component):
                 ])
             else:
                 sd[torrent_id] = dict(self.torrents[torrent_id][1])
-
+        if len(torrent_ids) == 1:
+            return sd[torrent_ids[0]]
         return sd
 
+    def create_status_dict_from_deferred(self, result, torrent_ids, keys):
+        if not torrent_ids:
+            torrent_ids = result.keys()
+        return self.create_status_dict(torrent_ids, keys)
+
     def get_torrent_status(self, torrent_id, keys):
         """
         Get a status dict for one torrent.
@@ -142,27 +161,23 @@ class SessionProxy(component.Component):
                     self.create_status_dict([torrent_id], keys)[torrent_id]
                 )
             else:
-                d = client.core.get_torrent_status(torrent_id, keys_to_get, 
True)
-                def on_status(result, torrent_id):
-                    t = time.time()
-                    self.torrents[torrent_id][0] = t
-                    self.torrents[torrent_id][1].update(result)
-                    for key in keys_to_get:
-                        self.cache_times[torrent_id][key] = t
-                    return self.create_status_dict([torrent_id], 
keys)[torrent_id]
-                return d.addCallback(on_status, torrent_id)
+                d = client.core.get_torrent_status(
+                    torrent_id, keys_to_get, True
+                ).addCallback(self.__on_torrents_status)
+                return d.addCallback(
+                    self.create_status_dict_from_deferred,
+                    [torrent_id],
+                    keys
+                )
         else:
-            d = client.core.get_torrent_status(torrent_id, keys, True)
-            def on_status(result):
-                if result:
-                    t = time.time()
-                    self.torrents[torrent_id] = (t, result)
-                    self.cache_times[torrent_id] = {}
-                    for key in result:
-                        self.cache_times[torrent_id][key] = t
-
-                return result
-            return d.addCallback(on_status)
+            d = client.core.get_torrent_status(
+                torrent_id, keys, True
+            ).addCallback(self.__on_torrents_status)
+            return d.addCallback(
+                self.create_status_dict_from_deferred,
+                None,
+                keys
+            )
 
     def get_torrents_status(self, filter_dict, keys):
         """
@@ -181,22 +196,8 @@ class SessionProxy(component.Component):
         :rtype: dict
 
         """
-        # Helper functions and callbacks 
---------------------------------------
-        def on_status(result, torrent_ids, keys):
-            # Update the internal torrent status dict with the update values
-            t = time.time()
-            for key, value in result.items():
-                self.torrents[key][0] = t
-                self.torrents[key][1].update(value)
-                for k in value:
-                    self.cache_times[key][k] = t
-
-            # Create the status dict
-            if not torrent_ids:
-                torrent_ids = result.keys()
-
-            return self.create_status_dict(torrent_ids, keys)
 
+        # Helper functions and callbacks 
---------------------------------------
         def find_torrents_to_fetch(torrent_ids):
             to_fetch = []
             t = time.time()
@@ -219,27 +220,48 @@ class SessionProxy(component.Component):
             # We get a list of any torrent_ids with expired status dicts
             to_fetch = find_torrents_to_fetch(self.torrents.keys())
             if to_fetch:
-                d = client.core.get_torrents_status({"id": to_fetch}, keys, 
True)
-                return d.addCallback(on_status, self.torrents.keys(), keys)
-
+                for torrent_ids_chunk in self.__get_list_in_chunks(to_fetch):
+                    d = client.core.get_torrents_status(
+                        {"id": torrent_ids_chunk}, keys, True
+                    ).addCallback(self.__on_torrents_status)
+                return d.addCallback(
+                    self.create_status_dict_from_deferred,
+                    self.torrents.keys(),
+                    keys
+                )
             # Don't need to fetch anything
-            return maybeDeferred(self.create_status_dict, 
self.torrents.keys(), keys)
-
+            return maybeDeferred(
+                self.create_status_dict, self.torrents.keys(), keys
+            )
 
         if len(filter_dict) == 1 and "id" in filter_dict:
             # At this point we should have a filter with just "id" in it
             to_fetch = find_torrents_to_fetch(filter_dict["id"])
             if to_fetch:
-                d = client.core.get_torrents_status({"id": to_fetch}, keys, 
True)
-                return d.addCallback(on_status, filter_dict["id"], keys)
+                for torrent_ids_chunk in self.__get_list_in_chunks(to_fetch):
+                    d = client.core.get_torrents_status(
+                        {"id": torrent_ids_chunk}, keys, True
+                    ).addCallback(self.__on_torrents_status)
+                return d.addCallback(
+                    self.create_status_dict_from_deferred,
+                    to_fetch,
+                    keys
+                )
             else:
                 # Don't need to fetch anything, so just return data from the 
cache
-                return maybeDeferred(self.create_status_dict, 
filter_dict["id"], keys)
+                return maybeDeferred(
+                    self.create_status_dict, filter_dict["id"], keys
+                )
         else:
             # This is a keyworded filter so lets just pass it onto the core
             # XXX: Add more caching here.
             d = client.core.get_torrents_status(filter_dict, keys, True)
-            return d.addCallback(on_status, None, keys)
+            d.addCallback(self.__on_torrents_status)
+            return d.addCallback(
+                self.create_status_dict_from_deferred,
+                None,
+                keys
+            )
 
     def on_torrent_state_changed(self, torrent_id, state):
         if torrent_id in self.torrents:
@@ -259,3 +281,20 @@ class SessionProxy(component.Component):
     def on_torrent_removed(self, torrent_id):
         del self.torrents[torrent_id]
         del self.cache_times[torrent_id]
+
+    def __on_torrents_status(self, status):
+        t = time.time()
+        for key, value in status.items():
+            self.torrents.setdefault(key, [t, value])
+            self.torrents[key][0] = t
+            self.torrents[key][1].update(value)
+            for k in value:
+                self.cache_times.setdefault(key, {}).update({k:t})
+        return status
+
+    def __get_list_in_chunks(self, list_to_chunk, chunk_size=20):
+        """
+        Yield successive n-sized chunks from list_to_chunk.
+        """
+        for i in xrange(0, len(list_to_chunk), chunk_size):
+            yield list_to_chunk[i:i+chunk_size]

-- 
You received this message because you are subscribed to the Google Groups 
"deluge-commit" group.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to 
[email protected].
For more options, visit this group at 
http://groups.google.com/group/deluge-commit?hl=en.

Reply via email to