Author: sleon
Date: 2006-01-09 02:48:15 +0000 (Mon, 09 Jan 2006)
New Revision: 7819
Added:
trunk/apps/freeviz/server_twist.py
Modified:
trunk/apps/freeviz/db.py
trunk/apps/freeviz/gen.py
trunk/apps/freeviz/handler.py
trunk/apps/freeviz/server.py
Log:
implemented own server , which can work with synchronous sqlobjects fixed lots
of bugs relted to threading and stability
Modified: trunk/apps/freeviz/db.py
===================================================================
--- trunk/apps/freeviz/db.py 2006-01-08 12:24:34 UTC (rev 7818)
+++ trunk/apps/freeviz/db.py 2006-01-09 02:48:15 UTC (rev 7819)
@@ -5,13 +5,15 @@
#mport pydot
-uri = 'mysql://twisted:severe at 127.0.0.1/twisted?cache=False'
-con = connectionForURI(uri)
-sqlhub.processConnection = con
+uri = 'postgres://tiwsted:severe at 127.0.0.1/twisted?debug=True'
+#con = connectionForURI(uri)
+#sqlhub.processConnection = con
+hub = dbconnection.ConnectionHub()
class NodePair(SQLObject):
_cacheValue = False
+ _connection = hub
node1 = ForeignKey('Node', notNull=True)
node2 = ForeignKey('Node', notNull=True)
@@ -19,10 +21,12 @@
backoffmax_node2 = StringCol(length=50, default='5000')
backoffcur_node1 = StringCol(length=50, default='0')
backoffcur_node2 = StringCol(length=50, default='0')
+ nodes = RelatedJoin('Node')
index = DatabaseIndex('node1','node2', unique=True)
class Node(SQLObject):
_cacheValue = False
+ _connection = hub
identity = StringCol(length=100, notNull=True)
lastUpdate = DateTimeCol(notNull=True, default=datetime.datetime.now())
name = StringCol(length=50, notNull=True, default='dummy')
@@ -37,23 +41,35 @@
transferring_requests = StringCol(length=10, notNull=True, default='0')
address = StringCol(length=32,notNull=True,default='0.0.0.0:0')
+ active=StringCol(length=1,notNull=True,default='N')
+ edges = RelatedJoin('NodePair')
index = DatabaseIndex('identity',unique=True)
+
+
#index2 = DatabaseIndex('name',unique=True)
def init():
- Node.createTable()
- NodePair.createTable()
+ con = get_con()
+ Node.createTable(connection=con)
+ NodePair.createTable(connection=con)
+
def drop():
- Node.dropTable()
- NodePair.dropTable()
+ con = get_con()
+ NodePair.dropTable(connection=con)
+
+ Node.dropTable(connection=con)
+
def reinit():
+ con = get_con()
drop()
init()
-def get_trans():
- return con.transaction()
+def get_con():
+ hub.threadConnection = connectionForURI(uri)
+
+ return hub.getConnection()
def getLastGoodVer(trans):
return trans.queryOne('SELECT MAX(last_good_version) from node')[0]
@@ -61,11 +77,16 @@
def delete_conns(nodeinfo, trans):
nodeid = getIdFromInfo(nodeinfo,trans)
- l = NodePair.select(connection=trans)
- for i in l:
- if i.node1.id == nodeid or i.node2.id == nodeid:
- i.delete(i.id)
+ node = Node.get(nodeid, connection=trans)
+ edges = node.edges
+ for edge in edges:
+ node1=edge.node1
+ node2=edge.node2
+ node1.removeNodePair(edge)
+ node2.removeNodePair(edge)
+ edge.delete(edge.id)
+
def exists(nodeinfo,trans):
result = Node.select(Node.q.identity ==
nodeinfo['identity'],connection=trans)
@@ -74,6 +95,8 @@
else:
return False
+def number_edges(node):
+ return len(list(node.edges))
def refresh(nodeinfo,trans):
if exists(nodeinfo,trans):
@@ -95,7 +118,7 @@
else:
raise Exception('No such node!')
-def insert(nodeinfo1, nodeinfo2, backoff1={}, backoff2={},trans=con):
+def insert(trans,nodeinfo1, nodeinfo2, backoff1={}, backoff2={}):
#NodePair.createTable( ifNotExists=True)
node1 = getIdFromInfo(nodeinfo1,trans)
@@ -112,6 +135,8 @@
bla = NodePair( node1=node1, node2=node2,connection=trans )
+ Node.get(node1, connection=trans).addNodePair(bla)
+ Node.get(node2, connection=trans).addNodePair(bla)
if backoff1:
bla.backoffmax_node1 = backoff1['backoffmax']
bla.backoffcur_node1 = backoff1['backoffcur']
Modified: trunk/apps/freeviz/gen.py
===================================================================
--- trunk/apps/freeviz/gen.py 2006-01-08 12:24:34 UTC (rev 7818)
+++ trunk/apps/freeviz/gen.py 2006-01-09 02:48:15 UTC (rev 7819)
@@ -103,10 +103,10 @@
lastgoodver = db.getLastGoodVer(trans)
#counts edges for a node
- edge_count={}
+ #edge_count={}
for node in nodes:
-
+ #edge_count[node.name]=0
nodecolor=self.nodeOK
transinfosize=self.defaultSize
@@ -114,6 +114,8 @@
nversion = self.regver.match(node.version).group(1)
if node.lastGoodVersion < lastgoodver:
nodecolor=self.nodeOUTDATED
+ elif db.number_edges(node) < self.minEdges:
+ nodecolor=self.nodeLCONNS
if node.requests != '0' or node.inserts != '0' or
node.transferring_requests != '0':
transinfosize="10px"
@@ -134,7 +136,6 @@
node.location[0:7], transinfosize, node.requests,
node.inserts, node.transferring_requests,nversion))
- edge_count[gnode.name]=0
g.add_node(gnode)
@@ -143,6 +144,12 @@
#there are no dublicate edges in the database
for node_pair in node_pairs:
+ #assert node_pair.node1.name in edge_count
+ #edge_count[node_pair.node1.name]+=1
+
+ #assert node_pair.node2.name in edge_count
+ #edge_count[node_pair.node2.name]+=1
+
edgecolor = self.edgeOK
node1loc = float(node_pair.node1.location)
@@ -151,6 +158,7 @@
if node_pair.backoffcur_node1 != '0' or
node_pair.backoffcur_node2 != '0':
edgecolor= self.edgeBLOCKED
+ print "adding %s-%s" %
(node_pair.node1.name,node_pair.node2.name)
gedge = pydot.Edge(node_pair.node1.name ,
node_pair.node2.name, color=edgecolor , fontcolor=edgecolor,
label='d: %0.3f' %
distance, fontsize='9.5',arrowhead='none')
#node1 is tail, node2 is head
@@ -162,15 +170,13 @@
gedge.headlabel='%s (%s)' %
(node_pair.backoffmax_node2, node_pair.backoffcur_node2 )
gedge.arrowhead='tee'
- edge_count[gedge.get_source()]+=1
- edge_count[gedge.get_destination()]+=1
g.add_edge(gedge)
- for node_name in edge_count.keys():
- if edge_count[node_name] < self.minEdges:
- if g.get_node(node_name).color !=
self.nodeOUTDATED:
-
g.get_node(node_name).color=self.nodeLCONNS
+# for node_name in edge_count.keys():
+# if edge_count[node_name] < self.minEdges:
+# if g.get_node(node_name).color !=
self.nodeOUTDATED:
+#
g.get_node(node_name).color=self.nodeLCONNS
if self.oldnstate:
@@ -206,15 +212,14 @@
delay=60
print "delay is %d" % delay
-
+con = db.get_con()
+trans = con.transaction()
while(True):
generator = Generator(oldnstate)
#STARING TRANS
- trans=db.get_trans()
generator.gentopology(trans)
histogram.gen(trans)
#COMMITING TRANS
trans.commit()
- del trans
oldnstate = generator.oldnstate
sleep(delay)
Modified: trunk/apps/freeviz/handler.py
===================================================================
--- trunk/apps/freeviz/handler.py 2006-01-08 12:24:34 UTC (rev 7818)
+++ trunk/apps/freeviz/handler.py 2006-01-09 02:48:15 UTC (rev 7819)
@@ -3,17 +3,9 @@
import time
timedelta=360
-
-def handle(data):
+def handle(data,trans):
(nodeinfo, nodeinfos, backoffs)=parser.parse(data)
-
-
-
- #STRARTING TRANSACTION
- trans = db.get_trans()
-
-
#deleting first
if 'identity' in nodeinfo:
db.refresh(nodeinfo,trans)
@@ -32,12 +24,11 @@
else:
backoff2= {}
- db.insert(nodeinfo,nodeinfo2,
backoff1=backoff1, backoff2=backoff2,trans=trans)
+ db.insert(trans, nodeinfo,nodeinfo2,
backoff1=backoff1, backoff2=backoff2)
check_nodes(trans)
#FINISHING TRANSACTION
trans.commit()
- del trans
#check if the node is up to date, if not remove it from node-pairs
def check_nodes(trans):
@@ -47,22 +38,18 @@
if inactive(node):
nodeinfo = getInfoFromNode(node)
db.delete_conns(nodeinfo,trans)
+ node.active='N'
+ else:
+ node.active='Y'
-
def inactive(node):
nodetime = node.lastUpdate
return (curtime() - convtime(nodetime) ) > timedelta
def get_activenodes(trans):
- nodes = list(db.Node.select(connection=trans))
- assert nodes
- active_nodes=[]
+ active_nodes = list(db.Node.select( db.Node.q.active == 'Y',
connection=trans))
- for node in nodes:
- if not inactive(node):
- active_nodes.append(node)
-
return active_nodes
#time in seconds since epoch
Modified: trunk/apps/freeviz/server.py
===================================================================
--- trunk/apps/freeviz/server.py 2006-01-08 12:24:34 UTC (rev 7818)
+++ trunk/apps/freeviz/server.py 2006-01-09 02:48:15 UTC (rev 7819)
@@ -1,35 +1,66 @@
-#!/usr/bin/python
-# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
-# See LICENSE for details.
+import socket
+import threading
+import handler
+import db
+import time
-#
+PORT=23415
+NRCONS=100
+DELAY=10
-from twisted.internet.protocol import Protocol, Factory
-#from twisted.protocols.basic import NetstringReceiver
-from twisted.internet import reactor
-import handler
-### Protocol Implementation
-# This is just about the simplest possible protocol
-class Echo(Protocol):
- buffer=''
- def dataReceived(self, data):
- """As soon as any data is received, write it back."""
- self.buffer+=data
- def connectionLost(self,reason):
- print self.buffer
- print "_______________________\n\n"
+class Base(threading.Thread):
+ vlock = threading.Lock()
+ chunks=[]
+ id=0
- handler.handle(self.buffer)
+class Handler(Base):
+
+ def run(self):
+ con = db.get_con()
+ trans = con.transaction()
+ while 1:
+ Base.vlock.acquire()
+ for chunk in Base.chunks:
+ handler.handle(chunk,trans)
+ Base.chunks=[]
+ Base.vlock.release()
+ time.sleep(DELAY)
+class serv(Base):
+ chunk=''
+ def __init__(self,clnsock):
+ threading.Thread.__init__(self)
+ self.clnsock=clnsock
+ self.myid=Base.id
+ Base.id+=1
-def main():
- f = Factory()
- f.protocol = Echo
- reactor.listenTCP(23415, f)
- reactor.run()
-if __name__ == '__main__':
- main()
+ def run(self):
+ while 1:
+ k = self.clnsock.recv(1024)
+ if k == '': break
+ self.chunk+=k
+ self.clnsock.close()
+ Base.vlock.acquire()
+ Base.chunks.append(self.chunk)
+ Base.vlock.release()
+ print "%s\n________\n" % self.chunk
+
+
+lstn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+lstn.bind(('',PORT))
+lstn.listen(100)
+h = Handler()
+h.start()
+
+while 1:
+ (clnt,ap) = lstn.accept()
+ s = serv(clnt)
+ s.start()
+
+
+
+
Added: trunk/apps/freeviz/server_twist.py
===================================================================
--- trunk/apps/freeviz/server_twist.py 2006-01-08 12:24:34 UTC (rev 7818)
+++ trunk/apps/freeviz/server_twist.py 2006-01-09 02:48:15 UTC (rev 7819)
@@ -0,0 +1,48 @@
+#!/usr/bin/python
+# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+#
+
+from twisted.internet.protocol import Protocol, Factory
+#from twisted.protocols.basic import NetstringReceiver
+from twisted.internet import reactor
+import handler
+import threading
+
+
+class Handler(threading.Thread):
+ data=''
+ def __init__(self,data):
+ threading.Thread.__init__(self)
+ self.data = data
+
+
+ def run(self):
+ handler.handle(self.data)
+
+### Protocol Implementation
+
+# This is just about the simplest possible protocol
+class Echo(Protocol):
+ buffer=''
+ def dataReceived(self, data):
+ """As soon as any data is received, write it back."""
+ self.buffer+=data
+ def connectionLost(self,reason):
+ print self.buffer
+ print "_______________________\n\n"
+
+ h = Handler(self.buffer)
+ h.start()
+ h.join()
+
+
+def main():
+ f = Factory()
+ f.protocol = Echo
+ reactor.listenTCP(23415, f)
+ reactor.run()
+
+if __name__ == '__main__':
+ main()