Author: dmeyer
Date: Fri Apr 21 19:50:39 2006
New Revision: 1438

Modified:
   trunk/beacon/src/db.py
   trunk/beacon/src/monitor.py

Log:
Enable signal to the db for monitoring. Update the monitor
code to use that with a master class to avoid checking
every monitor at the same time. The db also handles add
and update in a cleaner way to avoid crashes when the db
is not what the caller had in mind when calling update or add.


Modified: trunk/beacon/src/db.py
==============================================================================
--- trunk/beacon/src/db.py      (original)
+++ trunk/beacon/src/db.py      Fri Apr 21 19:50:39 2006
@@ -106,6 +106,10 @@
             length = (int, ATTR_SIMPLE),
             content = (str, ATTR_SIMPLE))
 
+        self.signals = {
+            'changed': kaa.notifier.Signal()
+            }
+        
         # commit
         self._db.commit()
 
@@ -188,20 +192,52 @@
 
         t1 = time.time()
         changes = self.changes
+        changed_id = []
         self.changes = []
-        for function, arg1, args, kwargs in changes:
-            # It could be possible that an item is added twice. But this is no
-            # problem because the duplicate will be removed at the
-            # next query. It can also happen that a dir is added because of
-            # _getdir and because of the parser. We can't avoid that but the
-            # db should clean itself up.
+        for function, arg1, kwargs in changes:
+            callback = None
             if 'callback' in kwargs:
                 callback = kwargs['callback']
                 del kwargs['callback']
-                callback(function(arg1, *args, **kwargs))
+            id = arg1
+            result = None
+            if function == 'delete':
+                # delete items and all subitems from the db. The delete 
function
+                # will return all ids deleted, callbacks are not allowed, so
+                # we can just continue
+                changed_id.extend(self._delete(id))
+                continue
+            if function == 'update':
+                try:
+                    result = self._db.update_object(id, **kwargs)
+                except Exception, e:
+                    log.error('%s not in the db: %s' % (id, e))
+            elif function == 'add':
+                # arg1 is the type, kwargs should contain parent and name, the
+                # result is the return of a query, so it has (type, id)
+                if 'parent' in kwargs and 'name' in kwargs:
+                    # make sure it is not in the db already
+                    name = kwargs['name']
+                    parent = kwargs['parent']
+                    result = self._db.query(name=name, parent=parent)
+                    if result:
+                        log.warning('switch to update for %s in %s' % (name, 
parent)) 
+                        # we already have such an item, switch to update mode
+                        result = result[0]
+                        id = result['type'], result['id']
+                        self._db.update_object(id, **kwargs)
+                if not result:
+                    # add object to db
+                    result = self._db.add_object(arg1, **kwargs)
+                    id = result['type'], result['id']
             else:
-                function(arg1, *args, **kwargs)
+                # programming error, this should never happen
+                log.error('unknown change <%s>' % function)
+            changed_id.append(id)
+            if callback and result is not None:
+                callback(result)
         self._db.commit()
+        self.signals['changed'].emit(changed_id)
         log.info('db.commit took %s seconds' % (time.time() - t1))
 
 
@@ -211,12 +247,14 @@
         items as parent (and so on). To avoid internal problems, make sure
         commit is called just after this function is called.
         """
-        log.info('DELETE %s', entry)
+        log.info('delete %s', entry)
         if isinstance(entry, Item):
             entry = entry._beacon_id
+        deleted = [ entry ]
         for child in self._db.query(parent = entry):
-            self._delete((child['type'], child['id']))
+            deleted.extend(self._delete((child['type'], child['id'])))
         self._db.delete_object(entry)
+        return deleted
 
 
     def _query_dir(self, parent):
@@ -266,7 +304,7 @@
                     # no client == server == write access
                     # delete from database by adding it to the internal changes
                     # list. It will be deleted right before the next commit.
-                    self.changes.append((self._delete, i, [], {}))
+                    self.changes.append(('delete', i, {}))
                 # delete
             if f == items[pos]._beacon_name:
                 # same file
@@ -283,7 +321,7 @@
             if not self.client:
                 # no client == server == write access
                 for i in items[pos+1-len(items):]:
-                    self.changes.append((self._delete, i, [], {}))
+                    self.changes.append(('delete', i, {}))
             items = items[:pos+1-len(items)]
 
         if self.changes:
@@ -443,9 +481,8 @@
         Get the object with the given type, name and parent. This function will
         look at the pending commits and also in the database.
         """
-        for func, type, args, kwargs  in self.changes:
-            if func == self._db.add_object and \
-                   'name' in kwargs and kwargs['name'] == name and \
+        for func, type, kwargs  in self.changes:
+            if func == 'add' and 'name' in kwargs and kwargs['name'] == name 
and \
                    'parent' in kwargs and kwargs['parent'] == parent:
                 self.commit()
                 break
@@ -455,8 +492,7 @@
         return None
         
             
-    def add_object(self, type, metadata=None, beacon_immediately=False,
-                   *args, **kwargs):
+    def add_object(self, type, metadata=None, beacon_immediately=False, 
**kwargs):
         """
         Add an object to the db. If the keyword 'beacon_immediately' is set, 
the
         object will be added now and the db will be locked until the next 
commit.
@@ -470,14 +506,14 @@
 
         if beacon_immediately:
             self.commit()
-            return self._db.add_object(type, *args, **kwargs)
-        self.changes.append((self._db.add_object, type, args, kwargs))
+            return self._db.add_object(type, **kwargs)
+        self.changes.append(('add', type, kwargs))
         if len(self.changes) > MAX_BUFFER_CHANGES:
             self.commit()
 
 
     def update_object(self, (type, id), metadata=None, 
beacon_immediately=False,
-                      *args, **kwargs):
+                      **kwargs):
         """
         Update an object to the db. If the keyword 'beacon_immediately' is 
set, the
         object will be updated now and the db will be locked until the next 
commit.
@@ -489,13 +525,13 @@
                 if metadata.has_key(key) and metadata[key] != None:
                     kwargs[key] = metadata[key]
 
-        self.changes.append((self._db.update_object, (type, id), args, kwargs))
+        self.changes.append(('update', (type, id), kwargs))
         if len(self.changes) > MAX_BUFFER_CHANGES or beacon_immediately:
             self.commit()
 
 
     def delete_object(self, (type, id), beacon_immediately=False):
-        self.changes.append((self._delete, (type, id), [], {}))
+        self.changes.append(('delete', (type, id), {}))
         if len(self.changes) > MAX_BUFFER_CHANGES or beacon_immediately:
             self.commit()
             

Modified: trunk/beacon/src/monitor.py
==============================================================================
--- trunk/beacon/src/monitor.py (original)
+++ trunk/beacon/src/monitor.py Fri Apr 21 19:50:39 2006
@@ -38,7 +38,7 @@
 
 # kaa imports
 from kaa.weakref import weakref
-from kaa.notifier import WeakTimer, Timer, execute_in_timer, Callback
+from kaa.notifier import WeakTimer, WeakOneShotTimer, Timer, execute_in_timer, 
Callback
 
 # kaa.beacon imports
 import parser
@@ -57,12 +57,39 @@
         self.remote(self.id, __ipc_oneway=True, __ipc_noproxy_args=True, 
*args, **kwargs)
 
 
+class Master(object):
+    def __init__(self, db):
+        self.monitors = []
+        self.timer = Timer(self.check)
+        db.signals['changed'].connect(self.changed)
+        
+    def connect(self, monitor):
+        self.monitors.append((weakref(monitor), []))
+        
+    def changed(self, changes):
+        for m, c in self.monitors:
+            c.extend(changes)
+        if not self.timer.active():
+            self.timer.start(0.02)
+
+    def check(self):
+        if not self.monitors:
+            return False
+        monitor, changes = self.monitors.pop(0)
+        if monitor == None:
+            return True
+        if changes:
+            monitor.check(changes)
+        self.monitors.append((monitor, []))
+
+_master = None
+
 class Monitor(object):
     """
     Monitor query for changes and call callback.
     """
-
     def __init__(self, callback, db, server, id, query):
+        global _master
         log.debug('create new monitor %s' % id)
         self.id = id
         self.callback = Notification(callback, self.id)
@@ -71,59 +98,65 @@
         self._query = query
         self._checker = None
         self.items = self._db.query(**self._query)
+        if not _master:
+            _master = Master(db)
+        _master.connect(self)
         if self.items and isinstance(self.items[0], Item):
             self._scan(True)
-        self._poll()
 
-#         if self._query.has_key('dirname') and \
-#            (not self._query.has_key('recursive') or not 
self._query['recursive']):
-#             # TODO: use inotify for monitoring, this will also fix the
-#             # problem when files grow because they are copied right
-#             # now and the first time we had no real information
-#             dirname = self._query['dirname']
-#             for m in self._db._mountpoints:
-#                 if dirname.startswith(m.directory):
-#                     break
-#             WeakTimer(self.check, dirname, m).start(1)
-#         if self._query.has_key('device'):
-#             # monitor a media
-#             # TODO: support other stuff except cdrom
-#             # FIXME: support removing the monitor :)
-#             cdrom.monitor(query['device'], weakref(self), db, self._server)
-            
+        # FIXME: how to get updates on directories not monitored by
+        # inotify? Maybe poll the dirs when we have a query with
+        # dirname it it?
+        
 
-    @execute_in_timer(WeakTimer, 1)
-    def _poll(self):
+    def check(self, changes):
+        """
+        This function compares the last query result with the current db status
+        and will inform the client when there is a change.
+        """
         if self._checker:
-            # still checking
+            # Still checking. FIXME: what happens if new files are added during
+            # scan? For one part, the changes here here the item changes 
itself,
+            # so we would update the client all the time. So it is better to 
wait
+            # here. Note: with inotify support this should not happen often.
+            WeakOneShotTimer(self.check, changes).start(1)
             return True
+
         current = self._db.query(**self._query)
+
+        # The query result length is different, this is a change
         if len(current) != len(self.items):
             self.items = current
-            if (current and isinstance(current[0], Item)) or \
-               (self.items and isinstance(self.items[0], Item)):
-                self._scan(False)
-            else:
-                self.callback('changed')
+            self.callback('changed')
             return True
+
+        # Same length and length is 0. No change here
         if len(current) == 0:
             return True
+
+        # Same length, check for changes inside the items
         if isinstance(current[0], Item):
-            for pos, c in enumerate(current):
-                if self.items[pos].url != c.url:
-                    self.items = current
-                    self._scan(False)
-                    return True
-        else:
-            for pos, c in enumerate(current):
-                if self.items[pos] != c:
+            for i in current:
+                if i._beacon_id in changes:
                     self.items = current
                     self.callback('changed')
                     return True
+            return True
+
+        # Same length, check if the strings itself did not change
+        for pos, c in enumerate(current):
+            if self.items[pos] != c:
+                self.items = current
+                self.callback('changed')
+                return True
         return True
 
 
     def _scan(self, first_call):
+        """
+        Start scanning the current list of items if they need to be updated.
+        With a full structure covered by inotify, there should be not changes.
+        """
         self._scan_step(self.items[:], [], first_call)
 
         
@@ -133,12 +166,31 @@
         Find changed items in 'items' and add them to changed.
         """
         if not items:
-            self._update(changed, first_call)
+            if not changed and first_call:
+                # no changes but it was our first call. Tell the client that 
everything
+                # is checked
+                self.callback('checked')
+                return False
+            if not changed:
+                # no changes but send the 'changed' ipc to the client
+                self.callback('changed')
+                return False
+            # We have some items that need an update. This will create a parser
+            # object checking all items in the list.
+            cb = Callback(self.checked, first_call)
+            c = parser.Checker(self.callback, self._db, changed, cb)
+            self._checker = c
+            if not first_call and len(changed) > 10:
+                # do not wait for the parser to send the changed signal, it may
+                # take a while.
+                self.callback('changed')
             return False
+
         c = 0
         while items:
             c += 1
             if c > 20:
+                # stop it and continue in the next step
                 return True
             i = items.pop(0)
             # FIXME: check parents
@@ -147,19 +199,6 @@
         return True
 
 
-    def _update(self, changed, first_call):
-        if changed:
-            cb = Callback(self.checked, first_call)
-            c = parser.Checker(self.callback, self._db, changed, cb)
-            self._checker = c
-            if not first_call and len(changed) > 10:
-                self.callback('changed')
-        elif first_call:
-            self.callback('checked')
-        else:
-            self.callback('changed')
-
-
     def checked(self, first_call):
         self._checker = None
         self.callback('changed')
@@ -172,56 +211,6 @@
             self._checker.stop()
         self._checker = None
 
-        
-    #     if self._query.has_key('device'):
-#             log.info('unable to update device query, just send notification 
here')
-#             # device query, can't update it
-#             if send_checked:
-#                 log.info('client.checked')
-#                 self.callback('checked')
-#                 return
-
-#         last_parent = None
-#         t1 = time.time()
-#         for i in self.items:
-#             # FIXME: this does not scale very good. For many items like a
-#             # recursive dir search it can take several seconds to scan all 
mtimes
-#             # and this is not an option.
-#             if not isinstance(i, item.Item):
-#                 # TODO: don't know how to monitor other types
-#                 continue
-
-#             # check parent and parent.parent mtime. Notice. The root
-#             # dir has also a parent, the media itself. So we need to stop at
-#             # parent.parent == None.
-#             parent = i.parent
-#             parent_check = []
-#             while last_parent != parent and parent and parent.parent:
-#                 mtime = parser.get_mtime(parent)
-#                 if mtime and parent.data['mtime'] != mtime and not parent in 
to_check:
-#                     parent_check.append(weakref(parent))
-#                 parent = parent.parent
-#             if parent_check:
-#                 parent_check.reverse()
-#                 to_check += parent_check
-#             last_parent = i.parent
-            
-#             mtime = parser.get_mtime(i)
-#             if not mtime:
-#                 continue
-#             if i.data['mtime'] == mtime:
-#                 continue
-#             to_check.append(weakref(i))
-
-#         if to_check:
-#             # FIXME: a constantly growing file like a recording will result 
in
-#             # a huge db activity on both client and server because checker 
calls
-#             # update again and the mtime changed.
-#             self._checker = weakref(parser.Checker(weakref(self), self._db, 
to_check))
-#         elif send_checked:
-#             log.info('client.checked')
-#             self.callback('checked')
-
 
     def __repr__(self):
         return '<beacon.Monitor for %s>' % self._query


-------------------------------------------------------
Using Tomcat but need to do more? Need to support web services, security?
Get stuff done quickly with pre-integrated technology to make your job easier
Download IBM WebSphere Application Server v.1.0.1 based on Apache Geronimo
http://sel.as-us.falkag.net/sel?cmd=lnk&kid=120709&bid=263057&dat=121642
_______________________________________________
Freevo-cvslog mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/freevo-cvslog

Reply via email to