Author: tack
Date: Sun Mar 11 21:45:28 2007
New Revision: 2547

Modified:
   trunk/epg/src/server.py
   trunk/epg/src/sources/zap2it.py

Log:
Rewrite zap2it source to use xml.sax and support for recent threading changes;
use a single timer for the jobs handler to prevent multiple timers from running
(harmless but ugly); use a mutex so we don't need to loop in add_program_wait


Modified: trunk/epg/src/server.py
==============================================================================
--- trunk/epg/src/server.py     (original)
+++ trunk/epg/src/server.py     Sun Mar 11 21:45:28 2007
@@ -33,6 +33,7 @@
 import logging
 import time
 from types import ListType
+import threading
 
 # kaa imports
 from kaa.db import *
@@ -78,7 +79,11 @@
         self._clients = []
         self._db = db
         self._rpc_server = []
+        
+        # Members for job queue.
         self._jobs = []
+        self._jobs_lock = threading.Lock()
+        self._jobs_timer = kaa.notifier.WeakTimer(self._handle_jobs)
 
         # initial sync
         self.sync()
@@ -97,7 +102,6 @@
         The result parameter is not used but given by the InProgress callback
         when this function is called after an update.
         """
-        log.info('commit database changes')
         self._db.commit()
 
         # Load some basic information from the db.
@@ -132,6 +136,8 @@
             log.info('update client %s', client)
             client.rpc('guide.update', info)
 
+        log.info('Database commit; %d programs in db' % self._num_programs)
+
 
     # -------------------------------------------------------------------------
     # kaa.rpc interface called by kaa.epg.Client
@@ -293,13 +299,17 @@
         Handle waiting add_program jobs.
         """
         t0 = time.time()
+        self._jobs_lock.acquire(False)
         while self._jobs:
             if time.time() - t0 > 0.05:
                 # time to return to the main loop
-                return 
kaa.notifier.OneShotTimer(self._handle_jobs).start(0.001)
+                return True 
#kaa.notifier.OneShotTimer(self._handle_jobs).start(0.001)
             args = self._jobs.pop()
             self.add_program(*args[:-1], **args[-1])
 
+        self._jobs_lock.release()
+        return False
+
 
     def add_program(self, channel_db_id, start, stop, title, **attributes):
         """
@@ -308,12 +318,14 @@
         """
         if not kaa.notifier.is_mainthread():
             self._jobs.append((channel_db_id, start, stop, title, attributes))
-            if len(self._jobs) > 100:
+            if len(self._jobs) == 1:
+                # Job added to (probably) empty queue, begin timer to handle 
jobs
+                # If timer is already running, this does nothing.
+                self._jobs_timer.start(0.001)
+            elif len(self._jobs) > 100:
                 # too many jobs pending, wait before adding new
                 while len(self._jobs) > 30:
                     time.sleep(0.1)
-            if len(self._jobs) == 1:
-                kaa.notifier.OneShotTimer(self._handle_jobs).start(0.1)
             return
         
         start = int(start)
@@ -367,5 +379,7 @@
         """
         if kaa.notifier.is_mainthread():
             raise RuntimeError('add_program_wait not called by thread')
-        while self._jobs:
-            time.sleep(0.1)
+
+        # Jobs lock is held as long as the jobs handler timer is running.
+        self._jobs_lock.acquire()
+        self._jobs_lock.release()

Modified: trunk/epg/src/sources/zap2it.py
==============================================================================
--- trunk/epg/src/sources/zap2it.py     (original)
+++ trunk/epg/src/sources/zap2it.py     Sun Mar 11 21:45:28 2007
@@ -7,7 +7,7 @@
 # kaa.epg - EPG Database
 # Copyright (C) 2004-2007 Jason Tackaberry, Dirk Meyer, Rob Shortt
 #
-# First Edition: Jason Tackaberry <[EMAIL PROTECTED]>
+# First Edition: Jason Tackaberry <[EMAIL PROTECTED]>
 #
 # Please see the file AUTHORS for a complete list of authors.
 #
@@ -35,9 +35,7 @@
 import gzip
 import calendar
 import logging
-from StringIO import StringIO
-
-import libxml2
+import xml.sax
 
 # kaa imports
 import kaa
@@ -48,6 +46,7 @@
 # get logging object
 log = logging.getLogger('epg')
 
+# FIXME: should be configurable.
 ZAP2IT_HOST = "datadirect.webservices.zap2it.com:80"
 ZAP2IT_URI = "/tvlistings/xtvdService"
 
@@ -135,6 +134,7 @@
     log.info('Connected in %.02f seconds; downloading guide update.' % 
(time.time() - t0))
 
     t0 = time.time()
+    # FIXME: check response header to see if data is compressed.
     fname = '/tmp/zap2it.xml.gz'
     dfile = open(fname, 'w+')
     # Read data in 50KB chunks.
@@ -151,121 +151,123 @@
     return fname
 
 
-def iternode(node):
-    child = node.children
-    while child:
-        yield child
-        child = child.get_next()
-
-
-def parse_station(node, info):
-    id = node.prop("id")
-    d = {}
-    for child in iternode(node):
-        if child.name == "callSign":
-            d["station"] = str_to_unicode(child.content)
-        elif child.name == "name":
-            d["name"] = str_to_unicode(child.content)
-    info.stations_by_id[id] = d
-
-
-def parse_map(node, info):
-    id = node.prop("station")
-    if id not in info.stations_by_id:
-        # Shouldn't happen.
-        return
-
-    channel = int(node.prop("channel"))
-    db_id = info.add_channel(tuner_id=channel,
-                             name=info.stations_by_id[id]["station"],
-                             long_name=info.stations_by_id[id]["name"])
-    info.stations_by_id[id]["db_id"] = db_id
 
+class Handler(xml.sax.handler.ContentHandler):
+    def __init__(self, epg):
+        xml.sax.handler.ContentHandler.__init__(self)
+        self._epg = epg
+        self._obj_type = None
+        self._stations_by_id = {}
+        self._schedule_by_program = {}
 
 
-def parse_schedule(node, info):
-    program_id = node.prop("program")
-    if program_id not in info.schedules_by_id:
-        return
-    d = info.schedules_by_id[program_id]
-    d["station_id"] = node.prop("station")
-    t = time.strptime(node.prop("time")[:-1], "%Y-%m-%dT%H:%M:%S")
-    d["start"] = int(calendar.timegm(t))
-    duration = node.prop("duration")
-
-    # Assumes duration is in the form PT00H00M
-    duration_secs = (int(duration[2:4])*60 + int(duration[5:7]))*60
-    d["stop"] = d["start"] + duration_secs
-    d["rating"] = str_to_unicode(node.prop("tvRating"))
-
-    info.add_program(info.stations_by_id[d["station_id"]]["db_id"], d["start"],
-                     d["stop"], d.get("title"), desc=d.get("desc"))
-
-
-def parse_program(node, info):
-    program_id = node.prop("id")
-    d = {}
-    for child in iternode(node):
-        if child.name == "title":
-            d["title"] = str_to_unicode(child.content)
-        elif child.name == "description":
-            d["desc"] = str_to_unicode(child.content)
-
-    info.schedules_by_id[program_id] = d
-
-
-def find_roots(node, roots = {}):
-    for child in iternode(node):
-        if child.name == "stations":
-            roots["stations"] = child
-        elif child.name == "lineup":
-            roots["lineup"] = child
-        elif child.name == "schedules":
-            roots["schedules"] = child
-        elif child.name == "programs":
-            roots["programs"] = child
-        elif child.name == "productionCrew":
-            roots["crew"] = child
-        elif child.children:
-            find_roots(child, roots)
-        if len(roots) == 5:
+    def startElement(self, name, attrs):
+        self._node_name = name
+        if self._obj_type:
             return
 
-class UpdateInfo:
-    pass
-
[EMAIL PROTECTED]('epg')
-def _parse_xml(start, stop):
-    filename = request(str(config.username), str(config.password),
-                       ZAP2IT_HOST, ZAP2IT_URI, start, stop)
-    if not filename:
-        return
-    doc = libxml2.parseFile(filename)
+        if name == 'schedule':
+            # Schedule elements map programs to times, but they are defined
+            # before programs in the xml, so we have to keep this map in
+            # memory.
+            try:
+                program = attrs.getValue('program')
+                station = attrs.getValue('station')
+                pgtime = attrs.getValue('time')
+                duration = attrs.getValue('duration')
+                rating = None
+                if 'rating' in attrs.getNames():
+                    # Optional attribute.
+                    rating = attrs.getValue('tvRating')
+            except KeyError, e:
+                log.warning('Malformed schedule element; no %s attribute.' % e)
+
+            if station not in self._stations_by_id:
+                log.warning('Schedule defined for unknown station %s' % 
station)
+                return
+            
+            t = time.strptime(pgtime[:-1], '%Y-%m-%dT%H:%M:%S')
+            start = int(calendar.timegm(t))
+            # Assumes duration is in the form PT00H00M
+            duration_secs = (int(duration[2:4])*60 + int(duration[5:7]))*60
+            stop = start + duration_secs
+
+            if program not in self._schedule_by_program:
+                self._schedule_by_program[program] = []
+            self._schedule_by_program[program].append({
+                'station': self._stations_by_id[station],
+                'time': pgtime,
+                'duration': duration_secs,
+                'start': start,
+                'stop': stop,
+                'rating': rating
+            })
+
+        elif name == 'program':
+            try:
+                self._obj = { 'id': attrs.getValue('id') }
+                self._obj_type = name
+            except KeyError, e:
+                log.warning('Malformed program element; no %s attribute.' % e)
+
+        elif name == 'station':
+            try:
+                self._obj = { 'id': attrs.getValue('id') }
+                self._obj_type = name
+            except KeyError, e:
+                log.warning('Malformed station element; no %s attribute.' % e)
+
+        elif name == 'map':
+            try:
+                station = attrs.getValue('station')
+                channel = int(attrs.getValue('channel'))
+            except KeyError, e:
+                return log.warning('Malformed map element; no %s attribute.' % 
e)
+            except ValueError:
+                return log.warning('Malformed map element; channel is not an 
integer.')
+
+            if station not in self._stations_by_id:
+                # Maps may references stations that haven't been defined; I
+                # believe we can safely ignore these.
+                return
+
+            db_id = self._epg.add_channel(tuner_id = channel,
+                                         name = 
self._stations_by_id[station]['callSign'],
+                                         long_name = 
self._stations_by_id[station]['name'])
+            self._stations_by_id[station]['db_id'] = db_id
+            
+
+    def characters(self, content):
+        if self._obj_type == 'program':
+            if self._node_name in ('title', 'description'):
+                self._obj[self._node_name] = self._obj.get(self._node_name, 
'') + content
+        elif self._obj_type == 'station':
+            if self._node_name in ('callSign', 'name'):
+                self._obj[self._node_name] = self._obj.get(self._node_name, 
'') + content
+
+
+    def endElement(self, name):
+        self._node_name = None
+        if name == 'station':
+            self._obj_type = None
+            self._stations_by_id[self._obj['id']] = self._obj
+        elif name == 'program':
+            self._obj_type = None
+            program = self._obj
+            if program['id'] not in self._schedule_by_program:
+                # Program defined for which there is no schedule.
+                return
+
+            for schedule in self._schedule_by_program[program['id']]:
+                channel_db_id = schedule['station']['db_id']
+                self._epg.add_program(channel_db_id, schedule['start'], 
schedule['stop'],
+                                      program.get('title'), desc = 
program.get('description'))
 
-    stations_by_id = {}
-    roots = {}
 
-    find_roots(doc, roots)
-    nprograms = 0
-    for child in iternode(roots["schedules"]):
-        if child.name == "schedule":
-            nprograms += 1
-
-    info = UpdateInfo()
-    info.doc = doc
-    info.roots = [roots["stations"], roots["lineup"], roots["programs"], \
-                  roots["schedules"]]
-    info.node_names = ["station", "map", "program", "schedule"]
-    info.node = None
-    info.total = nprograms
-    info.schedules_by_id = {}
-    info.stations_by_id = stations_by_id
-    info.progress_step = info.total / 100
-    info.t0 = time.time()
-
-
[EMAIL PROTECTED]()
[EMAIL PROTECTED]('epg')
 def update(epg, start = None, stop = None):
+    from gzip import GzipFile
+
     if not start:
         # If start isn't specified, choose current time (rounded down to the
         # nearest hour).
@@ -274,35 +276,15 @@
         # If stop isn't specified, use 24 hours after start.
         stop = start + (24 * 60 * 60)
 
-    # _parse_xml is forced to be executed in a thread. This means that
-    # it always returns an InProgress object that needs to be yielded.
-    # When yield returns we need to call the InProgress object to get
-    # the result. If the result is None, the thread run into an error.
-    info = _parse_xml(start, stop)
-    yield info
-    info = info()
-    if not info:
-        yield False
+    filename = request(str(config.username), str(config.password), 
ZAP2IT_HOST, ZAP2IT_URI, start, stop)
+    if not filename:
+        return
 
-    info.add_program = epg.add_program
-    info.add_channel = epg.add_channel
-    t0 = time.time()
-    while info.node or info.roots:
-        if not info.node:
-            info.node = info.roots.pop(0).children
-            info.cur_node_name = info.node_names.pop(0)
-
-        if info.node.name == info.cur_node_name:
-            globals()["parse_%s" % info.cur_node_name](info.node, info)
-
-        info.node = info.node.get_next()
-        if time.time() - t0 > 0.1:
-            # time to return to the main loop
-            yield kaa.notifier.YieldContinue
-            t0 = time.time()
-
-    os.unlink(info.doc.name)
-    info.doc.freeDoc()
-    log.info('Processed %d programs in %.02f seconds', epg._num_programs,
-             time.time() - info.t0)
-    yield True
+    parser = xml.sax.make_parser()
+    parser.setContentHandler(Handler(epg))
+    t0=time.time()
+    parser.parse(GzipFile(filename))
+    os.unlink(filename)
+    epg.add_program_wait()
+    log.info('zap2it XML parsing took %.03f seconds' % (time.time() - t0))
+    return True

-------------------------------------------------------------------------
Take Surveys. Earn Cash. Influence the Future of IT
Join SourceForge.net's Techsay panel and you'll get the chance to share your
opinions on IT & business topics through brief surveys-and earn cash
http://www.techsay.com/default.php?page=join.php&p=sourceforge&CID=DEVDEV
_______________________________________________
Freevo-cvslog mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/freevo-cvslog

Reply via email to