Hi,

find the attached test that exposes a bug in the zope SA-Session extension used 
by TG2.

Toggle the switch on the very beginning to see the 

What essentially happens is that transactions are not marked as "dirty" (or 
whatever the state is they get) when using direct SA SQL statements instead of 
passing things through the ORM-layer.

I use Elixir, but I *strongly* doubt that this changes anything - the 
DBSession is plain SA, as are the update-statement and the transaction-code.

As we had a discussion about bugs in upstream packages recently - is there 
anybody on the zope.sqlalchemy mailinglists or some such?

Versions:

SA: 5.2
zope.sqlalchemy: 0.4

Diez

--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"TurboGears Trunk" group.
To post to this group, send email to turbogears-trunk@googlegroups.com
To unsubscribe from this group, send email to 
turbogears-trunk+unsubscr...@googlegroups.com
For more options, visit this group at 
http://groups.google.com/group/turbogears-trunk?hl=en
-~----------~----~----~----~------~----~------~--~---

use_zope_transactions = True




import sys
from functools import wraps

import elixir

import threading

import time

from elixir import (
    Entity,
    Field,
    using_table_options,
    String,
    Integer,
    using_options,
    )

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

from zope.sqlalchemy import ZopeTransactionExtension



#===============================================================================


uri = "postgres://localhost/test"


if use_zope_transactions:
    extension = ZopeTransactionExtension()
else:
    extension = None


DBSession = scoped_session(sessionmaker(
    autoflush=True,
    autocommit=False,
    extension=extension
    ))


engine = create_engine(uri)

DBSession.configure(bind=engine)


class LockingTest(Entity):
    using_options(tablename = 'locking_test', session=DBSession)
    value = Field(Integer, required=True)

    def increase(self):
        cls = self.__class__
        c = cls.table.c
        q = cls.table.update().where(c.id == self.id).values(
            {c.value : c.value + 1}
            )
        DBSession.execute(q)
        #DBSession.refresh(self)
        # FIXME-dir: This is needed for the final commit - why???
        #self.value = self.value

elixir.session = DBSession
elixir.metadata.bind = engine
elixir.metadata.bind.echo = True

elixir.setup_all()
elixir.create_all()


if use_zope_transactions:
    from transaction import (
        begin,
        commit,
            abort,
        manager
    )
else:

    begin = lambda: None #DBSession.begin
    commit = DBSession.commit
    rollback = DBSession.rollback


times = 10
thread_count = 10

def transactional(function):
    """
    This is a decorator that will
    provide proper transactional boundaries.

    It is nestable, so that you can call
    methods that are wrapped recursively/nested.

    Additionally, it allows to call the wrapped
    functions for a number of retries, with a
    configurable pause between retries.

    See L{set_transaction_retries}

    """
    @wraps(function)
    def wrapper(*args, **kwargs):
        begin()
        try:
            res = function(*args, **kwargs)
            commit()
            return res
        except:
            rollback()
            raise


    return wrapper

def eq(a, b):
    if not a == b:
        assert False, "%r != %r" % (a,b)



@transactional
def setup():
    lt = LockingTest(value=0)
    DBSession.flush()
    return lt.id


@transactional
def count():
    return LockingTest.get(lt_id).value

lt_id = setup()

@transactional
def work(times=times, sleep=.01):
    lt = LockingTest.get(lt_id)
    for _ in xrange(times):
        lt.increase()
        if sleep:
            time.sleep(sleep)

work()
old_c = count()
eq(old_c, times)


threads = [threading.Thread(target=work) for _ in xrange(thread_count)]

def start_and_join():
    for t in threads:
        t.start()
    for t in threads:
        t.join()

start_and_join()

c = count()

print old_c, c
assert c > old_c
assert c == 10 * thread_count + old_c

Reply via email to