Author: tack
Date: Sun Mar 11 21:45:28 2007
New Revision: 2547
Modified:
trunk/epg/src/server.py
trunk/epg/src/sources/zap2it.py
Log:
Rewrite zap2it source to use xml.sax and support for recent threading changes;
use a single timer for the jobs handler to prevent multiple timers from running
(harmless but ugly); use a mutex so we don't need to loop in add_program_wait
Modified: trunk/epg/src/server.py
==============================================================================
--- trunk/epg/src/server.py (original)
+++ trunk/epg/src/server.py Sun Mar 11 21:45:28 2007
@@ -33,6 +33,7 @@
import logging
import time
from types import ListType
+import threading
# kaa imports
from kaa.db import *
@@ -78,7 +79,11 @@
self._clients = []
self._db = db
self._rpc_server = []
+
+ # Members for job queue.
self._jobs = []
+ self._jobs_lock = threading.Lock()
+ self._jobs_timer = kaa.notifier.WeakTimer(self._handle_jobs)
# initial sync
self.sync()
@@ -97,7 +102,6 @@
The result parameter is not used but given by the InProgress callback
when this function is called after an update.
"""
- log.info('commit database changes')
self._db.commit()
# Load some basic information from the db.
@@ -132,6 +136,8 @@
log.info('update client %s', client)
client.rpc('guide.update', info)
+ log.info('Database commit; %d programs in db' % self._num_programs)
+
# -------------------------------------------------------------------------
# kaa.rpc interface called by kaa.epg.Client
@@ -293,13 +299,17 @@
Handle waiting add_program jobs.
"""
t0 = time.time()
+ self._jobs_lock.acquire(False)
while self._jobs:
if time.time() - t0 > 0.05:
# time to return to the main loop
- return
kaa.notifier.OneShotTimer(self._handle_jobs).start(0.001)
+ return True
#kaa.notifier.OneShotTimer(self._handle_jobs).start(0.001)
args = self._jobs.pop()
self.add_program(*args[:-1], **args[-1])
+ self._jobs_lock.release()
+ return False
+
def add_program(self, channel_db_id, start, stop, title, **attributes):
"""
@@ -308,12 +318,14 @@
"""
if not kaa.notifier.is_mainthread():
self._jobs.append((channel_db_id, start, stop, title, attributes))
- if len(self._jobs) > 100:
+ if len(self._jobs) == 1:
+ # Job added to (probably) empty queue, begin timer to handle
jobs
+ # If timer is already running, this does nothing.
+ self._jobs_timer.start(0.001)
+ elif len(self._jobs) > 100:
# too many jobs pending, wait before adding new
while len(self._jobs) > 30:
time.sleep(0.1)
- if len(self._jobs) == 1:
- kaa.notifier.OneShotTimer(self._handle_jobs).start(0.1)
return
start = int(start)
@@ -367,5 +379,7 @@
"""
if kaa.notifier.is_mainthread():
raise RuntimeError('add_program_wait not called by thread')
- while self._jobs:
- time.sleep(0.1)
+
+ # Jobs lock is held as long as the jobs handler timer is running.
+ self._jobs_lock.acquire()
+ self._jobs_lock.release()
Modified: trunk/epg/src/sources/zap2it.py
==============================================================================
--- trunk/epg/src/sources/zap2it.py (original)
+++ trunk/epg/src/sources/zap2it.py Sun Mar 11 21:45:28 2007
@@ -7,7 +7,7 @@
# kaa.epg - EPG Database
# Copyright (C) 2004-2007 Jason Tackaberry, Dirk Meyer, Rob Shortt
#
-# First Edition: Jason Tackaberry <[EMAIL PROTECTED]>
+# First Edition: Jason Tackaberry <[EMAIL PROTECTED]>
#
# Please see the file AUTHORS for a complete list of authors.
#
@@ -35,9 +35,7 @@
import gzip
import calendar
import logging
-from StringIO import StringIO
-
-import libxml2
+import xml.sax
# kaa imports
import kaa
@@ -48,6 +46,7 @@
# get logging object
log = logging.getLogger('epg')
+# FIXME: should be configurable.
ZAP2IT_HOST = "datadirect.webservices.zap2it.com:80"
ZAP2IT_URI = "/tvlistings/xtvdService"
@@ -135,6 +134,7 @@
log.info('Connected in %.02f seconds; downloading guide update.' %
(time.time() - t0))
t0 = time.time()
+ # FIXME: check response header to see if data is compressed.
fname = '/tmp/zap2it.xml.gz'
dfile = open(fname, 'w+')
# Read data in 50KB chunks.
@@ -151,121 +151,123 @@
return fname
-def iternode(node):
- child = node.children
- while child:
- yield child
- child = child.get_next()
-
-
-def parse_station(node, info):
- id = node.prop("id")
- d = {}
- for child in iternode(node):
- if child.name == "callSign":
- d["station"] = str_to_unicode(child.content)
- elif child.name == "name":
- d["name"] = str_to_unicode(child.content)
- info.stations_by_id[id] = d
-
-
-def parse_map(node, info):
- id = node.prop("station")
- if id not in info.stations_by_id:
- # Shouldn't happen.
- return
-
- channel = int(node.prop("channel"))
- db_id = info.add_channel(tuner_id=channel,
- name=info.stations_by_id[id]["station"],
- long_name=info.stations_by_id[id]["name"])
- info.stations_by_id[id]["db_id"] = db_id
+class Handler(xml.sax.handler.ContentHandler):
+ def __init__(self, epg):
+ xml.sax.handler.ContentHandler.__init__(self)
+ self._epg = epg
+ self._obj_type = None
+ self._stations_by_id = {}
+ self._schedule_by_program = {}
-def parse_schedule(node, info):
- program_id = node.prop("program")
- if program_id not in info.schedules_by_id:
- return
- d = info.schedules_by_id[program_id]
- d["station_id"] = node.prop("station")
- t = time.strptime(node.prop("time")[:-1], "%Y-%m-%dT%H:%M:%S")
- d["start"] = int(calendar.timegm(t))
- duration = node.prop("duration")
-
- # Assumes duration is in the form PT00H00M
- duration_secs = (int(duration[2:4])*60 + int(duration[5:7]))*60
- d["stop"] = d["start"] + duration_secs
- d["rating"] = str_to_unicode(node.prop("tvRating"))
-
- info.add_program(info.stations_by_id[d["station_id"]]["db_id"], d["start"],
- d["stop"], d.get("title"), desc=d.get("desc"))
-
-
-def parse_program(node, info):
- program_id = node.prop("id")
- d = {}
- for child in iternode(node):
- if child.name == "title":
- d["title"] = str_to_unicode(child.content)
- elif child.name == "description":
- d["desc"] = str_to_unicode(child.content)
-
- info.schedules_by_id[program_id] = d
-
-
-def find_roots(node, roots = {}):
- for child in iternode(node):
- if child.name == "stations":
- roots["stations"] = child
- elif child.name == "lineup":
- roots["lineup"] = child
- elif child.name == "schedules":
- roots["schedules"] = child
- elif child.name == "programs":
- roots["programs"] = child
- elif child.name == "productionCrew":
- roots["crew"] = child
- elif child.children:
- find_roots(child, roots)
- if len(roots) == 5:
+ def startElement(self, name, attrs):
+ self._node_name = name
+ if self._obj_type:
return
-class UpdateInfo:
- pass
-
[EMAIL PROTECTED]('epg')
-def _parse_xml(start, stop):
- filename = request(str(config.username), str(config.password),
- ZAP2IT_HOST, ZAP2IT_URI, start, stop)
- if not filename:
- return
- doc = libxml2.parseFile(filename)
+ if name == 'schedule':
+ # Schedule elements map programs to times, but they are defined
+ # before programs in the xml, so we have to keep this map in
+ # memory.
+ try:
+ program = attrs.getValue('program')
+ station = attrs.getValue('station')
+ pgtime = attrs.getValue('time')
+ duration = attrs.getValue('duration')
+ rating = None
+ if 'rating' in attrs.getNames():
+ # Optional attribute.
+ rating = attrs.getValue('tvRating')
+ except KeyError, e:
+ log.warning('Malformed schedule element; no %s attribute.' % e)
+
+ if station not in self._stations_by_id:
+ log.warning('Schedule defined for unknown station %s' %
station)
+ return
+
+ t = time.strptime(pgtime[:-1], '%Y-%m-%dT%H:%M:%S')
+ start = int(calendar.timegm(t))
+ # Assumes duration is in the form PT00H00M
+ duration_secs = (int(duration[2:4])*60 + int(duration[5:7]))*60
+ stop = start + duration_secs
+
+ if program not in self._schedule_by_program:
+ self._schedule_by_program[program] = []
+ self._schedule_by_program[program].append({
+ 'station': self._stations_by_id[station],
+ 'time': pgtime,
+ 'duration': duration_secs,
+ 'start': start,
+ 'stop': stop,
+ 'rating': rating
+ })
+
+ elif name == 'program':
+ try:
+ self._obj = { 'id': attrs.getValue('id') }
+ self._obj_type = name
+ except KeyError, e:
+ log.warning('Malformed program element; no %s attribute.' % e)
+
+ elif name == 'station':
+ try:
+ self._obj = { 'id': attrs.getValue('id') }
+ self._obj_type = name
+ except KeyError, e:
+ log.warning('Malformed station element; no %s attribute.' % e)
+
+ elif name == 'map':
+ try:
+ station = attrs.getValue('station')
+ channel = int(attrs.getValue('channel'))
+ except KeyError, e:
+ return log.warning('Malformed map element; no %s attribute.' %
e)
+ except ValueError:
+ return log.warning('Malformed map element; channel is not an
integer.')
+
+ if station not in self._stations_by_id:
+ # Maps may references stations that haven't been defined; I
+ # believe we can safely ignore these.
+ return
+
+ db_id = self._epg.add_channel(tuner_id = channel,
+ name =
self._stations_by_id[station]['callSign'],
+ long_name =
self._stations_by_id[station]['name'])
+ self._stations_by_id[station]['db_id'] = db_id
+
+
+ def characters(self, content):
+ if self._obj_type == 'program':
+ if self._node_name in ('title', 'description'):
+ self._obj[self._node_name] = self._obj.get(self._node_name,
'') + content
+ elif self._obj_type == 'station':
+ if self._node_name in ('callSign', 'name'):
+ self._obj[self._node_name] = self._obj.get(self._node_name,
'') + content
+
+
+ def endElement(self, name):
+ self._node_name = None
+ if name == 'station':
+ self._obj_type = None
+ self._stations_by_id[self._obj['id']] = self._obj
+ elif name == 'program':
+ self._obj_type = None
+ program = self._obj
+ if program['id'] not in self._schedule_by_program:
+ # Program defined for which there is no schedule.
+ return
+
+ for schedule in self._schedule_by_program[program['id']]:
+ channel_db_id = schedule['station']['db_id']
+ self._epg.add_program(channel_db_id, schedule['start'],
schedule['stop'],
+ program.get('title'), desc =
program.get('description'))
- stations_by_id = {}
- roots = {}
- find_roots(doc, roots)
- nprograms = 0
- for child in iternode(roots["schedules"]):
- if child.name == "schedule":
- nprograms += 1
-
- info = UpdateInfo()
- info.doc = doc
- info.roots = [roots["stations"], roots["lineup"], roots["programs"], \
- roots["schedules"]]
- info.node_names = ["station", "map", "program", "schedule"]
- info.node = None
- info.total = nprograms
- info.schedules_by_id = {}
- info.stations_by_id = stations_by_id
- info.progress_step = info.total / 100
- info.t0 = time.time()
-
-
[EMAIL PROTECTED]()
[EMAIL PROTECTED]('epg')
def update(epg, start = None, stop = None):
+ from gzip import GzipFile
+
if not start:
# If start isn't specified, choose current time (rounded down to the
# nearest hour).
@@ -274,35 +276,15 @@
# If stop isn't specified, use 24 hours after start.
stop = start + (24 * 60 * 60)
- # _parse_xml is forced to be executed in a thread. This means that
- # it always returns an InProgress object that needs to be yielded.
- # When yield returns we need to call the InProgress object to get
- # the result. If the result is None, the thread run into an error.
- info = _parse_xml(start, stop)
- yield info
- info = info()
- if not info:
- yield False
+ filename = request(str(config.username), str(config.password),
ZAP2IT_HOST, ZAP2IT_URI, start, stop)
+ if not filename:
+ return
- info.add_program = epg.add_program
- info.add_channel = epg.add_channel
- t0 = time.time()
- while info.node or info.roots:
- if not info.node:
- info.node = info.roots.pop(0).children
- info.cur_node_name = info.node_names.pop(0)
-
- if info.node.name == info.cur_node_name:
- globals()["parse_%s" % info.cur_node_name](info.node, info)
-
- info.node = info.node.get_next()
- if time.time() - t0 > 0.1:
- # time to return to the main loop
- yield kaa.notifier.YieldContinue
- t0 = time.time()
-
- os.unlink(info.doc.name)
- info.doc.freeDoc()
- log.info('Processed %d programs in %.02f seconds', epg._num_programs,
- time.time() - info.t0)
- yield True
+ parser = xml.sax.make_parser()
+ parser.setContentHandler(Handler(epg))
+ t0=time.time()
+ parser.parse(GzipFile(filename))
+ os.unlink(filename)
+ epg.add_program_wait()
+ log.info('zap2it XML parsing took %.03f seconds' % (time.time() - t0))
+ return True
-------------------------------------------------------------------------
Take Surveys. Earn Cash. Influence the Future of IT
Join SourceForge.net's Techsay panel and you'll get the chance to share your
opinions on IT & business topics through brief surveys-and earn cash
http://www.techsay.com/default.php?page=join.php&p=sourceforge&CID=DEVDEV
_______________________________________________
Freevo-cvslog mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/freevo-cvslog