Author: dmeyer
Date: Fri Apr 21 19:50:39 2006
New Revision: 1438
Modified:
trunk/beacon/src/db.py
trunk/beacon/src/monitor.py
Log:
Enable signal to the db for monitoring. Update the monitor
code to use that with a master class to avoid checking
every monitor at the same time. The db also handles add
and update in a cleaner way to avoid crashes when the db
is not what the caller had in mind when calling update or add.
Modified: trunk/beacon/src/db.py
==============================================================================
--- trunk/beacon/src/db.py (original)
+++ trunk/beacon/src/db.py Fri Apr 21 19:50:39 2006
@@ -106,6 +106,10 @@
length = (int, ATTR_SIMPLE),
content = (str, ATTR_SIMPLE))
+ self.signals = {
+ 'changed': kaa.notifier.Signal()
+ }
+
# commit
self._db.commit()
@@ -188,20 +192,52 @@
t1 = time.time()
changes = self.changes
+ changed_id = []
self.changes = []
- for function, arg1, args, kwargs in changes:
- # It could be possible that an item is added twice. But this is no
- # problem because the duplicate will be removed at the
- # next query. It can also happen that a dir is added because of
- # _getdir and because of the parser. We can't avoid that but the
- # db should clean itself up.
+ for function, arg1, kwargs in changes:
+ callback = None
if 'callback' in kwargs:
callback = kwargs['callback']
del kwargs['callback']
- callback(function(arg1, *args, **kwargs))
+ id = arg1
+ result = None
+ if function == 'delete':
+ # delete items and all subitems from the db. The delete
function
+ # will return all ids deleted, callbacks are not allowed, so
+ # we can just continue
+ changed_id.extend(self._delete(id))
+ continue
+ if function == 'update':
+ try:
+ result = self._db.update_object(id, **kwargs)
+ except Exception, e:
+ log.error('%s not in the db: %s' % (id, e))
+ elif function == 'add':
+ # arg1 is the type, kwargs should contain parent and name, the
+ # result is the return of a query, so it has (type, id)
+ if 'parent' in kwargs and 'name' in kwargs:
+ # make sure it is not in the db already
+ name = kwargs['name']
+ parent = kwargs['parent']
+ result = self._db.query(name=name, parent=parent)
+ if result:
+ log.warning('switch to update for %s in %s' % (name,
parent))
+ # we already have such an item, switch to update mode
+ result = result[0]
+ id = result['type'], result['id']
+ self._db.update_object(id, **kwargs)
+ if not result:
+ # add object to db
+ result = self._db.add_object(arg1, **kwargs)
+ id = result['type'], result['id']
else:
- function(arg1, *args, **kwargs)
+ # programming error, this should never happen
+ log.error('unknown change <%s>' % function)
+ changed_id.append(id)
+ if callback and result is not None:
+ callback(result)
self._db.commit()
+ self.signals['changed'].emit(changed_id)
log.info('db.commit took %s seconds' % (time.time() - t1))
@@ -211,12 +247,14 @@
items as parent (and so on). To avoid internal problems, make sure
commit is called just after this function is called.
"""
- log.info('DELETE %s', entry)
+ log.info('delete %s', entry)
if isinstance(entry, Item):
entry = entry._beacon_id
+ deleted = [ entry ]
for child in self._db.query(parent = entry):
- self._delete((child['type'], child['id']))
+ deleted.extend(self._delete((child['type'], child['id'])))
self._db.delete_object(entry)
+ return deleted
def _query_dir(self, parent):
@@ -266,7 +304,7 @@
# no client == server == write access
# delete from database by adding it to the internal changes
# list. It will be deleted right before the next commit.
- self.changes.append((self._delete, i, [], {}))
+ self.changes.append(('delete', i, {}))
# delete
if f == items[pos]._beacon_name:
# same file
@@ -283,7 +321,7 @@
if not self.client:
# no client == server == write access
for i in items[pos+1-len(items):]:
- self.changes.append((self._delete, i, [], {}))
+ self.changes.append(('delete', i, {}))
items = items[:pos+1-len(items)]
if self.changes:
@@ -443,9 +481,8 @@
Get the object with the given type, name and parent. This function will
look at the pending commits and also in the database.
"""
- for func, type, args, kwargs in self.changes:
- if func == self._db.add_object and \
- 'name' in kwargs and kwargs['name'] == name and \
+ for func, type, kwargs in self.changes:
+ if func == 'add' and 'name' in kwargs and kwargs['name'] == name
and \
'parent' in kwargs and kwargs['parent'] == parent:
self.commit()
break
@@ -455,8 +492,7 @@
return None
- def add_object(self, type, metadata=None, beacon_immediately=False,
- *args, **kwargs):
+ def add_object(self, type, metadata=None, beacon_immediately=False,
**kwargs):
"""
Add an object to the db. If the keyword 'beacon_immediately' is set,
the
object will be added now and the db will be locked until the next
commit.
@@ -470,14 +506,14 @@
if beacon_immediately:
self.commit()
- return self._db.add_object(type, *args, **kwargs)
- self.changes.append((self._db.add_object, type, args, kwargs))
+ return self._db.add_object(type, **kwargs)
+ self.changes.append(('add', type, kwargs))
if len(self.changes) > MAX_BUFFER_CHANGES:
self.commit()
def update_object(self, (type, id), metadata=None,
beacon_immediately=False,
- *args, **kwargs):
+ **kwargs):
"""
Update an object to the db. If the keyword 'beacon_immediately' is
set, the
object will be updated now and the db will be locked until the next
commit.
@@ -489,13 +525,13 @@
if metadata.has_key(key) and metadata[key] != None:
kwargs[key] = metadata[key]
- self.changes.append((self._db.update_object, (type, id), args, kwargs))
+ self.changes.append(('update', (type, id), kwargs))
if len(self.changes) > MAX_BUFFER_CHANGES or beacon_immediately:
self.commit()
def delete_object(self, (type, id), beacon_immediately=False):
- self.changes.append((self._delete, (type, id), [], {}))
+ self.changes.append(('delete', (type, id), {}))
if len(self.changes) > MAX_BUFFER_CHANGES or beacon_immediately:
self.commit()
Modified: trunk/beacon/src/monitor.py
==============================================================================
--- trunk/beacon/src/monitor.py (original)
+++ trunk/beacon/src/monitor.py Fri Apr 21 19:50:39 2006
@@ -38,7 +38,7 @@
# kaa imports
from kaa.weakref import weakref
-from kaa.notifier import WeakTimer, Timer, execute_in_timer, Callback
+from kaa.notifier import WeakTimer, WeakOneShotTimer, Timer, execute_in_timer,
Callback
# kaa.beacon imports
import parser
@@ -57,12 +57,39 @@
self.remote(self.id, __ipc_oneway=True, __ipc_noproxy_args=True,
*args, **kwargs)
+class Master(object):
+ def __init__(self, db):
+ self.monitors = []
+ self.timer = Timer(self.check)
+ db.signals['changed'].connect(self.changed)
+
+ def connect(self, monitor):
+ self.monitors.append((weakref(monitor), []))
+
+ def changed(self, changes):
+ for m, c in self.monitors:
+ c.extend(changes)
+ if not self.timer.active():
+ self.timer.start(0.02)
+
+ def check(self):
+ if not self.monitors:
+ return False
+ monitor, changes = self.monitors.pop(0)
+ if monitor == None:
+ return True
+ if changes:
+ monitor.check(changes)
+ self.monitors.append((monitor, []))
+
+_master = None
+
class Monitor(object):
"""
Monitor query for changes and call callback.
"""
-
def __init__(self, callback, db, server, id, query):
+ global _master
log.debug('create new monitor %s' % id)
self.id = id
self.callback = Notification(callback, self.id)
@@ -71,59 +98,65 @@
self._query = query
self._checker = None
self.items = self._db.query(**self._query)
+ if not _master:
+ _master = Master(db)
+ _master.connect(self)
if self.items and isinstance(self.items[0], Item):
self._scan(True)
- self._poll()
-# if self._query.has_key('dirname') and \
-# (not self._query.has_key('recursive') or not
self._query['recursive']):
-# # TODO: use inotify for monitoring, this will also fix the
-# # problem when files grow because they are copied right
-# # now and the first time we had no real information
-# dirname = self._query['dirname']
-# for m in self._db._mountpoints:
-# if dirname.startswith(m.directory):
-# break
-# WeakTimer(self.check, dirname, m).start(1)
-# if self._query.has_key('device'):
-# # monitor a media
-# # TODO: support other stuff except cdrom
-# # FIXME: support removing the monitor :)
-# cdrom.monitor(query['device'], weakref(self), db, self._server)
-
+ # FIXME: how to get updates on directories not monitored by
+ # inotify? Maybe poll the dirs when we have a query with
+ # dirname it it?
+
- @execute_in_timer(WeakTimer, 1)
- def _poll(self):
+ def check(self, changes):
+ """
+ This function compares the last query result with the current db status
+ and will inform the client when there is a change.
+ """
if self._checker:
- # still checking
+ # Still checking. FIXME: what happens if new files are added during
+ # scan? For one part, the changes here here the item changes
itself,
+ # so we would update the client all the time. So it is better to
wait
+ # here. Note: with inotify support this should not happen often.
+ WeakOneShotTimer(self.check, changes).start(1)
return True
+
current = self._db.query(**self._query)
+
+ # The query result length is different, this is a change
if len(current) != len(self.items):
self.items = current
- if (current and isinstance(current[0], Item)) or \
- (self.items and isinstance(self.items[0], Item)):
- self._scan(False)
- else:
- self.callback('changed')
+ self.callback('changed')
return True
+
+ # Same length and length is 0. No change here
if len(current) == 0:
return True
+
+ # Same length, check for changes inside the items
if isinstance(current[0], Item):
- for pos, c in enumerate(current):
- if self.items[pos].url != c.url:
- self.items = current
- self._scan(False)
- return True
- else:
- for pos, c in enumerate(current):
- if self.items[pos] != c:
+ for i in current:
+ if i._beacon_id in changes:
self.items = current
self.callback('changed')
return True
+ return True
+
+ # Same length, check if the strings itself did not change
+ for pos, c in enumerate(current):
+ if self.items[pos] != c:
+ self.items = current
+ self.callback('changed')
+ return True
return True
def _scan(self, first_call):
+ """
+ Start scanning the current list of items if they need to be updated.
+ With a full structure covered by inotify, there should be not changes.
+ """
self._scan_step(self.items[:], [], first_call)
@@ -133,12 +166,31 @@
Find changed items in 'items' and add them to changed.
"""
if not items:
- self._update(changed, first_call)
+ if not changed and first_call:
+ # no changes but it was our first call. Tell the client that
everything
+ # is checked
+ self.callback('checked')
+ return False
+ if not changed:
+ # no changes but send the 'changed' ipc to the client
+ self.callback('changed')
+ return False
+ # We have some items that need an update. This will create a parser
+ # object checking all items in the list.
+ cb = Callback(self.checked, first_call)
+ c = parser.Checker(self.callback, self._db, changed, cb)
+ self._checker = c
+ if not first_call and len(changed) > 10:
+ # do not wait for the parser to send the changed signal, it may
+ # take a while.
+ self.callback('changed')
return False
+
c = 0
while items:
c += 1
if c > 20:
+ # stop it and continue in the next step
return True
i = items.pop(0)
# FIXME: check parents
@@ -147,19 +199,6 @@
return True
- def _update(self, changed, first_call):
- if changed:
- cb = Callback(self.checked, first_call)
- c = parser.Checker(self.callback, self._db, changed, cb)
- self._checker = c
- if not first_call and len(changed) > 10:
- self.callback('changed')
- elif first_call:
- self.callback('checked')
- else:
- self.callback('changed')
-
-
def checked(self, first_call):
self._checker = None
self.callback('changed')
@@ -172,56 +211,6 @@
self._checker.stop()
self._checker = None
-
- # if self._query.has_key('device'):
-# log.info('unable to update device query, just send notification
here')
-# # device query, can't update it
-# if send_checked:
-# log.info('client.checked')
-# self.callback('checked')
-# return
-
-# last_parent = None
-# t1 = time.time()
-# for i in self.items:
-# # FIXME: this does not scale very good. For many items like a
-# # recursive dir search it can take several seconds to scan all
mtimes
-# # and this is not an option.
-# if not isinstance(i, item.Item):
-# # TODO: don't know how to monitor other types
-# continue
-
-# # check parent and parent.parent mtime. Notice. The root
-# # dir has also a parent, the media itself. So we need to stop at
-# # parent.parent == None.
-# parent = i.parent
-# parent_check = []
-# while last_parent != parent and parent and parent.parent:
-# mtime = parser.get_mtime(parent)
-# if mtime and parent.data['mtime'] != mtime and not parent in
to_check:
-# parent_check.append(weakref(parent))
-# parent = parent.parent
-# if parent_check:
-# parent_check.reverse()
-# to_check += parent_check
-# last_parent = i.parent
-
-# mtime = parser.get_mtime(i)
-# if not mtime:
-# continue
-# if i.data['mtime'] == mtime:
-# continue
-# to_check.append(weakref(i))
-
-# if to_check:
-# # FIXME: a constantly growing file like a recording will result
in
-# # a huge db activity on both client and server because checker
calls
-# # update again and the mtime changed.
-# self._checker = weakref(parser.Checker(weakref(self), self._db,
to_check))
-# elif send_checked:
-# log.info('client.checked')
-# self.callback('checked')
-
def __repr__(self):
return '<beacon.Monitor for %s>' % self._query
-------------------------------------------------------
Using Tomcat but need to do more? Need to support web services, security?
Get stuff done quickly with pre-integrated technology to make your job easier
Download IBM WebSphere Application Server v.1.0.1 based on Apache Geronimo
http://sel.as-us.falkag.net/sel?cmd=lnk&kid=120709&bid=263057&dat=121642
_______________________________________________
Freevo-cvslog mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/freevo-cvslog