Author: sleon
Date: 2006-01-09 02:48:15 +0000 (Mon, 09 Jan 2006)
New Revision: 7819

Added:
   trunk/apps/freeviz/server_twist.py
Modified:
   trunk/apps/freeviz/db.py
   trunk/apps/freeviz/gen.py
   trunk/apps/freeviz/handler.py
   trunk/apps/freeviz/server.py
Log:
implemented own server , which can work with synchronous sqlobjects fixed lots 
of bugs relted to threading and stability

Modified: trunk/apps/freeviz/db.py
===================================================================
--- trunk/apps/freeviz/db.py    2006-01-08 12:24:34 UTC (rev 7818)
+++ trunk/apps/freeviz/db.py    2006-01-09 02:48:15 UTC (rev 7819)
@@ -5,13 +5,15 @@
 #mport pydot


-uri = 'mysql://twisted:severe at 127.0.0.1/twisted?cache=False'
-con = connectionForURI(uri)
-sqlhub.processConnection = con
+uri = 'postgres://tiwsted:severe at 127.0.0.1/twisted?debug=True'
+#con = connectionForURI(uri)

+#sqlhub.processConnection = con
+hub = dbconnection.ConnectionHub()

 class NodePair(SQLObject):
        _cacheValue = False
+       _connection = hub
         node1 = ForeignKey('Node', notNull=True)
         node2 = ForeignKey('Node', notNull=True)

@@ -19,10 +21,12 @@
        backoffmax_node2 = StringCol(length=50, default='5000')
        backoffcur_node1 = StringCol(length=50, default='0')
        backoffcur_node2 = StringCol(length=50, default='0')
+       nodes = RelatedJoin('Node')
         index = DatabaseIndex('node1','node2', unique=True)

 class Node(SQLObject):
        _cacheValue = False
+       _connection = hub
        identity = StringCol(length=100, notNull=True)
        lastUpdate = DateTimeCol(notNull=True, default=datetime.datetime.now())
        name = StringCol(length=50, notNull=True, default='dummy')
@@ -37,23 +41,35 @@
        transferring_requests = StringCol(length=10, notNull=True, default='0')
        address = StringCol(length=32,notNull=True,default='0.0.0.0:0')

+       active=StringCol(length=1,notNull=True,default='N')
+       edges = RelatedJoin('NodePair')
        index = DatabaseIndex('identity',unique=True)
+       
+       
        #index2 = DatabaseIndex('name',unique=True)

 def init():
-       Node.createTable()
-       NodePair.createTable()
+       con = get_con()

+       Node.createTable(connection=con)
+       NodePair.createTable(connection=con)
+
 def drop():
-       Node.dropTable()
-       NodePair.dropTable()
+       con = get_con()

+       NodePair.dropTable(connection=con)
+
+       Node.dropTable(connection=con)
+       
 def reinit():
+       con = get_con() 
        drop()
        init()

-def get_trans():
-       return con.transaction()
+def get_con():
+       hub.threadConnection = connectionForURI(uri)
+       
+       return hub.getConnection()

 def getLastGoodVer(trans):
        return trans.queryOne('SELECT MAX(last_good_version) from node')[0]
@@ -61,11 +77,16 @@

 def delete_conns(nodeinfo, trans):
        nodeid = getIdFromInfo(nodeinfo,trans)
-       l = NodePair.select(connection=trans)
-       for i in l:
-               if i.node1.id == nodeid or i.node2.id == nodeid:
-                       i.delete(i.id)
+       node = Node.get(nodeid, connection=trans)
+       edges = node.edges      

+       for edge in edges:
+               node1=edge.node1
+               node2=edge.node2
+               node1.removeNodePair(edge)
+               node2.removeNodePair(edge)
+               edge.delete(edge.id)
+
 def exists(nodeinfo,trans):
        result = Node.select(Node.q.identity == 
nodeinfo['identity'],connection=trans)

@@ -74,6 +95,8 @@
        else:
                return False

+def number_edges(node):
+       return len(list(node.edges))

 def refresh(nodeinfo,trans):
        if exists(nodeinfo,trans):
@@ -95,7 +118,7 @@
        else:
                raise Exception('No such node!')

-def insert(nodeinfo1, nodeinfo2, backoff1={}, backoff2={},trans=con):
+def insert(trans,nodeinfo1, nodeinfo2, backoff1={}, backoff2={}):

         #NodePair.createTable( ifNotExists=True)
        node1 = getIdFromInfo(nodeinfo1,trans)
@@ -112,6 +135,8 @@


        bla = NodePair( node1=node1, node2=node2,connection=trans )
+       Node.get(node1, connection=trans).addNodePair(bla)
+       Node.get(node2, connection=trans).addNodePair(bla)
        if backoff1:
                bla.backoffmax_node1 = backoff1['backoffmax']
                bla.backoffcur_node1 = backoff1['backoffcur']

Modified: trunk/apps/freeviz/gen.py
===================================================================
--- trunk/apps/freeviz/gen.py   2006-01-08 12:24:34 UTC (rev 7818)
+++ trunk/apps/freeviz/gen.py   2006-01-09 02:48:15 UTC (rev 7819)
@@ -103,10 +103,10 @@
                lastgoodver = db.getLastGoodVer(trans)

                #counts edges for a node        
-               edge_count={}
+               #edge_count={}

                for node in nodes:
-       
+                       #edge_count[node.name]=0
                        nodecolor=self.nodeOK
                        transinfosize=self.defaultSize

@@ -114,6 +114,8 @@
                        nversion = self.regver.match(node.version).group(1)
                        if node.lastGoodVersion < lastgoodver:
                                nodecolor=self.nodeOUTDATED
+                       elif db.number_edges(node) < self.minEdges:
+                               nodecolor=self.nodeLCONNS

                        if node.requests != '0' or node.inserts != '0' or 
node.transferring_requests != '0':
                                transinfosize="10px"
@@ -134,7 +136,6 @@
                node.location[0:7], transinfosize, node.requests, 
                node.inserts, node.transferring_requests,nversion))

-                       edge_count[gnode.name]=0
                        g.add_node(gnode)


@@ -143,6 +144,12 @@

                #there are no dublicate edges in the database 
                for node_pair in node_pairs:
+                       #assert node_pair.node1.name in edge_count
+                       #edge_count[node_pair.node1.name]+=1
+
+                       #assert node_pair.node2.name in edge_count
+                       #edge_count[node_pair.node2.name]+=1
+
                        edgecolor = self.edgeOK

                        node1loc = float(node_pair.node1.location)
@@ -151,6 +158,7 @@

                        if node_pair.backoffcur_node1 != '0' or 
node_pair.backoffcur_node2 != '0':
                                edgecolor= self.edgeBLOCKED
+                       print "adding %s-%s" % 
(node_pair.node1.name,node_pair.node2.name)
                        gedge = pydot.Edge(node_pair.node1.name , 
node_pair.node2.name, color=edgecolor , fontcolor=edgecolor, 
                                                        label='d: %0.3f' % 
distance, fontsize='9.5',arrowhead='none')
                        #node1 is tail, node2 is head
@@ -162,15 +170,13 @@
                                        gedge.headlabel='%s (%s)' % 
(node_pair.backoffmax_node2, node_pair.backoffcur_node2 ) 
                                        gedge.arrowhead='tee'

-                       edge_count[gedge.get_source()]+=1
-                       edge_count[gedge.get_destination()]+=1
                        g.add_edge(gedge)


-               for node_name in edge_count.keys():
-                       if edge_count[node_name] < self.minEdges:
-                               if g.get_node(node_name).color != 
self.nodeOUTDATED:
-                                       
g.get_node(node_name).color=self.nodeLCONNS
+#              for node_name in edge_count.keys():
+#                      if edge_count[node_name] < self.minEdges:
+#                              if g.get_node(node_name).color != 
self.nodeOUTDATED:
+#                                      
g.get_node(node_name).color=self.nodeLCONNS


                if self.oldnstate:
@@ -206,15 +212,14 @@
        delay=60

 print "delay is %d" % delay
-
+con = db.get_con()
+trans = con.transaction()
 while(True):
        generator = Generator(oldnstate)
        #STARING TRANS
-       trans=db.get_trans()
        generator.gentopology(trans)
        histogram.gen(trans)
        #COMMITING TRANS
        trans.commit()
-       del trans
        oldnstate = generator.oldnstate
        sleep(delay)

Modified: trunk/apps/freeviz/handler.py
===================================================================
--- trunk/apps/freeviz/handler.py       2006-01-08 12:24:34 UTC (rev 7818)
+++ trunk/apps/freeviz/handler.py       2006-01-09 02:48:15 UTC (rev 7819)
@@ -3,17 +3,9 @@
 import time

 timedelta=360
-
-def handle(data):
+def handle(data,trans):
        (nodeinfo, nodeinfos, backoffs)=parser.parse(data)

-
-
-
-       #STRARTING TRANSACTION
-       trans = db.get_trans()
-
-
        #deleting first
        if 'identity' in nodeinfo:
                db.refresh(nodeinfo,trans)
@@ -32,12 +24,11 @@
                                else:
                                        backoff2= {}

-                               db.insert(nodeinfo,nodeinfo2, 
backoff1=backoff1, backoff2=backoff2,trans=trans)
+                               db.insert(trans, nodeinfo,nodeinfo2, 
backoff1=backoff1, backoff2=backoff2)
        check_nodes(trans)

        #FINISHING TRANSACTION
        trans.commit()
-       del trans

 #check if the node is up to date, if not remove it from node-pairs
 def check_nodes(trans):
@@ -47,22 +38,18 @@
                if inactive(node):
                        nodeinfo = getInfoFromNode(node)
                        db.delete_conns(nodeinfo,trans)
+                       node.active='N'
+               else:
+                       node.active='Y'

-
 def inactive(node):
        nodetime = node.lastUpdate
        return (curtime() - convtime(nodetime) ) > timedelta


 def get_activenodes(trans):
-       nodes = list(db.Node.select(connection=trans))
-       assert nodes
-       active_nodes=[]
+       active_nodes = list(db.Node.select( db.Node.q.active == 'Y', 
connection=trans))

-       for node in nodes:
-               if not inactive(node):
-                       active_nodes.append(node)
-
        return active_nodes

 #time in seconds since epoch

Modified: trunk/apps/freeviz/server.py
===================================================================
--- trunk/apps/freeviz/server.py        2006-01-08 12:24:34 UTC (rev 7818)
+++ trunk/apps/freeviz/server.py        2006-01-09 02:48:15 UTC (rev 7819)
@@ -1,35 +1,66 @@
-#!/usr/bin/python
-# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
-# See LICENSE for details.
+import socket
+import threading
+import handler
+import db
+import time

-#
+PORT=23415
+NRCONS=100
+DELAY=10

-from twisted.internet.protocol import Protocol, Factory
-#from twisted.protocols.basic import NetstringReceiver
-from twisted.internet import reactor
-import handler

-### Protocol Implementation

-# This is just about the simplest possible protocol
-class Echo(Protocol):
-    buffer=''
-    def dataReceived(self, data):
-        """As soon as any data is received, write it back."""
-       self.buffer+=data
-    def connectionLost(self,reason):
-       print self.buffer
-       print "_______________________\n\n"
+class Base(threading.Thread):
+       vlock = threading.Lock()
+       chunks=[]
+       id=0

-       handler.handle(self.buffer)
+class Handler(Base):
+       

+       def run(self):
+               con = db.get_con()              
+               trans = con.transaction()
+               while 1:
+                       Base.vlock.acquire()
+                       for chunk in Base.chunks:
+                               handler.handle(chunk,trans)
+                       Base.chunks=[]
+                       Base.vlock.release()
+                       time.sleep(DELAY)                       

+class serv(Base):
+       chunk=''
+       def __init__(self,clnsock):
+               threading.Thread.__init__(self)
+               self.clnsock=clnsock
+               self.myid=Base.id
+               Base.id+=1

-def main():
-    f = Factory()
-    f.protocol = Echo
-    reactor.listenTCP(23415, f)
-    reactor.run()

-if __name__ == '__main__':
-    main()
+       def run(self):
+               while 1:
+                       k = self.clnsock.recv(1024)
+                       if k == '': break
+                       self.chunk+=k
+               self.clnsock.close()
+               Base.vlock.acquire()
+               Base.chunks.append(self.chunk)
+               Base.vlock.release()    
+               print "%s\n________\n" % self.chunk
+
+
+lstn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+lstn.bind(('',PORT))
+lstn.listen(100)
+h = Handler()
+h.start()
+
+while 1:
+       (clnt,ap) = lstn.accept()
+       s = serv(clnt)
+       s.start()       
+
+
+
+

Added: trunk/apps/freeviz/server_twist.py
===================================================================
--- trunk/apps/freeviz/server_twist.py  2006-01-08 12:24:34 UTC (rev 7818)
+++ trunk/apps/freeviz/server_twist.py  2006-01-09 02:48:15 UTC (rev 7819)
@@ -0,0 +1,48 @@
+#!/usr/bin/python
+# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+#
+
+from twisted.internet.protocol import Protocol, Factory
+#from twisted.protocols.basic import NetstringReceiver
+from twisted.internet import reactor
+import handler
+import threading
+
+
+class Handler(threading.Thread):
+       data=''
+       def __init__(self,data):
+               threading.Thread.__init__(self)
+               self.data = data        
+
+
+       def run(self):
+               handler.handle(self.data)
+
+### Protocol Implementation
+
+# This is just about the simplest possible protocol
+class Echo(Protocol):
+    buffer=''
+    def dataReceived(self, data):
+        """As soon as any data is received, write it back."""
+       self.buffer+=data
+    def connectionLost(self,reason):
+       print self.buffer
+       print "_______________________\n\n"
+
+       h = Handler(self.buffer)
+       h.start()
+       h.join()
+
+
+def main():
+    f = Factory()
+    f.protocol = Echo
+    reactor.listenTCP(23415, f)
+    reactor.run()
+
+if __name__ == '__main__':
+    main()


Reply via email to