dabo Commit
Revision 4097
Date: 2008-05-25 14:06:05 -0700 (Sun, 25 May 2008)
Author: Uwe_Grauer
Trac: http://svn.dabodev.com/trac/dabo/changeset/4097
Changed:
U trunk/dabo/db/dbOracle.py
U trunk/dabo/db/test/test_dCursorMixin.py
Log:
First draft for Oracle support.
As cx_oracle currently doesn't support unicode, this is for further reference
only.
Diff:
Modified: trunk/dabo/db/dbOracle.py
===================================================================
--- trunk/dabo/db/dbOracle.py 2008-05-23 18:36:01 UTC (rev 4096)
+++ trunk/dabo/db/dbOracle.py 2008-05-25 21:06:05 UTC (rev 4097)
@@ -1,60 +1,38 @@
# -*- coding: utf-8 -*-
-""" This is a template for creating new backend-specific scripts. To create
-a script to support a database not yet suppported by Dabo, make a copy
-of this file in the dabo/db directory, and name the copy 'dbProduct.py',
-where 'Product' is the actual name of the database (e.g., dbMySQL.py,
-dbFirebird.py, etc.)
-This template uses 'NEWDATABASE' as the name of the database; you
-should replace this with the actual name of the database
-(e.g., Oracle, PostgreSQL, etc.)
+#
+# The used python database module for Oracle (cx_oracle) currently doesn't
support unicode.
+# So we keep this as a reference for further development only
+#
-Then go down through each section marked with TODO comments, and
-modify the code so that it works correctly for this particular database. As
-soon as you know that it works, remove the TODO comment, and replace it
-with anything that might be relevant.
-
-These database-specific scripts are designed to abstract out those parts
-of the code that can vary among the various products out there. By
-customizing the code in these methods, the standard cursor works great
-in the framework with any database backend. However, if you find
-something about your database that simply can't be fixed by
-customizing these methods, report it to the dabo-dev list; it may require
-some refactoring of the code to handle a situation that is unique to this
-particular database.
-"""
import datetime
from dabo.dLocalize import _
from dBackend import dBackend
-class NEWDATABASE(dBackend):
+class Oracle(dBackend):
def __init__(self):
+ import cx_Oracle as dbapi
dBackend.__init__(self)
- #### TODO: Customize with name of dbapi module
- self.dbModuleName = "???"
-
-
+ self.dbModuleName = "cx_Oracle"
+ self.dbapi = dbapi
+
def getConnection(self, connectInfo, **kwargs):
- #### TODO: replace 'ZZZ' with dbapi module name
- import ZZZ as dbapi
+ import cx_Oracle as dbapi
+ self.conn_user = connectInfo.User
port = connectInfo.Port
if not port:
- #### TODO: Customize with standard NEWDATABASE port
- port = -1
-
- #### TODO: Customize to make correct connect string
- self._connection = dbapi.connect(host=connectInfo.Host,
- user = connectInfo.User, passwd =
connectInfo.revealPW(),
- db=connectInfo.Database, port=port, **kwargs)
+ port = 1521
+ dsn = dbapi.makedsn(connectInfo.Host, port,
connectInfo.Database)
+ self._connection = dbapi.connect(user = connectInfo.User,
+ password =
connectInfo.revealPW(),
+ dsn = dsn)
return self._connection
def getDictCursorClass(self):
- #### TODO: Replace 'ZZZ' with appropriate NEWDATABASE dbapi
- #### module class or just a standard cursor, if it doesn't
offer Dict cursors.
- return ZZZ.DictCursor
+ return self.dbapi.Cursor
def escQuote(self, val):
@@ -66,19 +44,23 @@
return qt + val.replace(sl, sl+sl).replace(qt, sl+qt) + qt
+ def processFields(self, txt):
+ # this was used for testing only
+ if isinstance(txt, unicode):
+ txt = str(txt)
+ return txt
+
def formatDateTime(self, val):
""" We need to wrap the value in quotes. """
- #### TODO: Make sure that the format for DateTime
- #### values is returned correctly
sqt = "'" # single quote
val = self._stringify(val)
return "%s%s%s" % (sqt, val, sqt)
def getTables(self, cursor, includeSystemTables=False):
- #### TODO: Verify that this works with NEWDATABASE, including
- #### the option for including/excluding system tables.
- cursor.execute("show tables")
+ #sqlstr = "select table_name from all_tables where
tablespace_name NOT IN ('SYSTEM', 'SYSAUX')"
+ sqlstr = "select table_name from user_tables"
+ cursor.execute(sqlstr)
rs = cursor.getDataSet()
tables = []
for record in rs:
@@ -87,66 +69,105 @@
def getTableRecordCount(self, tableName, cursor):
- #### TODO: Verify that this is the correct syntax for
NEWDATABASE
cursor.execute("select count(*) as ncount from %s" % tableName)
return cursor.getDataSet()[0][0]
def getFields(self, tableName, cursor):
- #### TODO: Modify for NEWDATABASE syntax
- cursor.execute("describe %s" % tableName)
+ # get PK
+ print "dbOracle.getFields(): ", tableName
+ sqlstr = """SELECT cols.column_name FROM all_constraints cons,
all_cons_columns cols
+ WHERE cols.table_name = '%s' AND cons.constraint_type = 'P'
+ AND cons.constraint_name = cols.constraint_name AND cons.owner
= cols.owner
+ ORDER BY cols.table_name, cols.position"""
+
+ sqlstr = sqlstr % tableName
+ cursor.execute(sqlstr)
+ rs = cursor.getDataSet(rows=1)
+ #print "rs = cursor.getDataSet(): ", rs
+ try:
+ pkField = rs[0]["COLUMN_NAME"].strip()
+ except:
+ pkField = None
+ # Now get the field info
+ sqlstr = """SELECT column_name, data_type,
COALESCE(data_precision, data_length) "LENGTH",
+ data_scale "SCALE" FROM all_tab_columns WHERE table_name = '%s'
ORDER BY column_id"""
+ cursor.execute(sqlstr % tableName)
rs = cursor.getDataSet()
- fldDesc = cursor.description
- # The field name is the first element of the tuple. Find the
- # first entry with the field name 'Key'; that will be the
- # position for the PK flag
- for i in range(len(fldDesc)):
- if fldDesc[i][0] == 'Key':
- pkPos = i
- break
-
fields = []
for r in rs:
- #### TODO: Alter these so that they match the field type
- #### names returned by NEWDATABASE.
- name = r[0]
- ft = r[1]
- if ft.split()[0] == "tinyint(1)":
- ft = "B"
- elif "int" in ft:
- ft = "I"
- elif "varchar" in ft:
- # will be followed by length
- ln = int(ft.split("(")[1].split(")")[0])
- if ln > 255:
- ft = "M"
+ #### TODO: add missing field types
+ fname = r["COLUMN_NAME"].strip()
+ ftype = r["DATA_TYPE"].strip()
+ if ftype == "NUMBER":
+ if r["SCALE"] == 0:
+ ft = "I"
else:
- ft = "C"
- elif "char" in ft :
- ft = "C"
- elif "text" in ft:
+ ft = "N"
+ elif ftype == "VARCHAR2":
ft = "M"
- elif "decimal" in ft:
- ft = "N"
- elif "datetime" in ft:
+ elif ftype == "DATE":
+ ft = "D"
+ elif ftype == "TIMESTAMP(6)":
ft = "T"
- elif "date" in ft:
- ft = "D"
- elif "enum" in ft:
- ft = "C"
else:
+ print r
+ print "unknown ftype: ", ftype
ft = "?"
- pk = (r[pkPos] == "PRI")
+ if pkField is None:
+ # No pk defined for the table
+ pk = False
+ else:
+ pk = ( r["COLUMN_NAME"].lower() ==
pkField.lower() )
- fields.append((name.strip(), ft, pk))
+ fields.append((fname.lower(), ft, pk))
return tuple(fields)
+ def getLimitWord(self):
+ """ Oracle uses something like "where rownum <= num". """
+ return "rownum <="
+
+ def formSQL(self, fieldClause, fromClause, joinClause,
+ whereClause, groupByClause, orderByClause,
limitClause):
+ """ Oracle wants the limit clause as where clause. """
+ if whereClause:
+ if limitClause:
+ whereClause = whereClause + " and %s" %
limitClause
+ elif limitClause:
+ whereClause = "where %s" % limitClause
+ clauses = (fieldClause, fromClause, joinClause,
+ whereClause, groupByClause, orderByClause)
+ # clause.upper() was used for testing only
+ sql = "SELECT " + "\n".join( [clause.upper() for clause in
clauses if clause] )
+ return sql
+
+ def beginTransaction(self, cursor):
+ """ Begin a SQL transaction."""
+ ret = False
+ # used for testing
+ if not self._connection._has_transaction():
+ self._connection.begin()
+ dabo.dbActivityLog.write("SQL: begin")
+ ret = True
+ return ret
+
def getWordMatchFormat(self):
- #### TODO: If NEWDATABASE supports fulltext searches with
matching by
- #### words, create an expression that will execute such a
search
- #### The format must have the expressions %(table)s,
%(field)s and %(value)s
- #### which will be replaced with the table, field, and value
strings,
- #### respectively. If NEWDATABASE does not support word
searches, delete
- #### this method to use the default backend class's method.
return """ match (%(table)s.%(field)s) against ("%(value)s") """
+
+
+#
+# only for testing
+#
+def main():
+ from dabo.db.dConnectInfo import dConnectInfo
+
+ ora = Oracle()
+ connInfo = dConnectInfo(Name="myconn", DbType="Oracle", Port=1521,
+ User="fwadm",
Password="V7EE74E49H6BV27TA0J65G2AS21", Database='XE')
+ conn = ora.getConnection(connInfo)
+
+
+
+if __name__ == '__main__':
+ main()
Modified: trunk/dabo/db/test/test_dCursorMixin.py
===================================================================
--- trunk/dabo/db/test/test_dCursorMixin.py 2008-05-23 18:36:01 UTC (rev
4096)
+++ trunk/dabo/db/test/test_dCursorMixin.py 2008-05-25 21:06:05 UTC (rev
4097)
@@ -12,6 +12,7 @@
"firebird": False,
"postgresql": False,
"mssql": False,
+ "oracle": False,
}
# Convert the flags into class references. Setting to object will keep the
tests
@@ -325,6 +326,40 @@
""" % self.temp_table_name)
+class Test_dCursorMixin_oracle(Test_dCursorMixin, unittest.TestCase):
+ def setUp(self):
+ con = dabo.db.dConnection(DbType="Oracle", User="fwadm",
+ password="V7EE74E49H6BV27TA0J65G2AS21",
Database="XE",
+ Host="athlon28")
+ self.cur = con.getDaboCursor()
+ self.temp_table_name = "unittest%s" %
getRandomUUID().replace("-", "")[-17:]
+ super(Test_dCursorMixin_oracle, self).setUp()
+
+ def tearDown(self):
+ self.cur.execute("drop table %s" % self.temp_table_name)
+ super(Test_dCursorMixin_mysql, self).tearDown()
+
+ def createSchema(self):
+ cur = self.cur
+ cur.execute("""
+create table %s (pk INTEGER PRIMARY KEY, cfield CHAR (32), ifield INT, nfield
DECIMAL (8,2))
+""" % self.temp_table_name)
+ cur.execute("""
+insert into %s (cfield, ifield, nfield) values ("Paul Keith McNett", 23, 23.23)
+""" % self.temp_table_name)
+ cur.execute("""
+insert into %s (cfield, ifield, nfield) values ("Edward Leafe", 42, 42.42)
+""" % self.temp_table_name)
+ cur.execute("""
+insert into %s (cfield, ifield, nfield) values ("Carl Karsten", 10223,
23032.76)
+""" % self.temp_table_name)
+
+ def createNullRecord(self):
+ self.cur.AuxCursor.execute("""
+insert into %s (cfield, ifield, nfield) values (NULL, NULL, NULL)
+""" % self.temp_table_name)
+
+
class Test_dCursorMixin_firebird(Test_dCursorMixin, unittest.TestCase):
## NOTE: Firebird not set up completely yet. What is here is courtesy
Uwe
## Grauer. We need insert statements, and we need a firebird
server.
@@ -387,7 +422,8 @@
testClasses = []
mapping = {"sqlite": Test_dCursorMixin_sqlite,
"mysql": Test_dCursorMixin_mysql,
- "firebird": Test_dCursorMixin_firebird}
+ "firebird": Test_dCursorMixin_firebird,
+ "oracle": Test_dCursorMixin_oracle}
for k, v in db_tests.items():
if v:
testClasses.append(mapping[k])
_______________________________________________
Post Messages to: [email protected]
Subscription Maintenance: http://leafe.com/mailman/listinfo/dabo-dev
Searchable Archives: http://leafe.com/archives/search/dabo-dev
This message: http://leafe.com/archives/byMID/[EMAIL PROTECTED]