Re: [sqlalchemy] Session management for general functions within a class

2022-04-29 Thread Andrew Martin
Hi Simon, thank you for your thoughts. Sorry about the incomplete code. 
This project has gotten out of control, and I'm tired and a little burned 
out and have a launch deadline for Monday, and I was hoping this would be 
enough to uncover some basic stupidity in my approach. As long as I don't 
care about code being duplicated in lots of places, it all works. But 
refactoring to clean things up is uncovering some fundamental lack of 
understanding about how this stuff works.

The session is about as basic as it gets from a utility function.

from sqlalchemy import create_engine
def get_db_session() -> Session:
engine = create_engine(
f"postgresql://{settings.PG_USER}:{settings.PG_PASS}@{settings.PG_DSN}:{settings.PG_PORT}/{settings.DATABLENDER_DB}"
 
# noqa: E501
)
session = Session(engine)
return session

So in my Mixin class there's just 

from app.utils import get_db_session

and self.db_session = get_db_session()

Everything else is working from that. I'll get a complete working example 
up tonight or tomorrow. It's gotten complex because some of the logic is 
distributed and in Airflow DAGs and tasks and stuff. It's not super easy to 
pull out a class and some models and have something to demonstrate. And I 
sure as hell didn't want to just dump the whole thing on people here and be 
like, "Hey can you fix this for me?" lol!

On Friday, April 29, 2022 at 5:51:26 AM UTC-5 Simon King wrote:

> It's difficult to debug this without a script that we can run to reproduce 
> the problem. What kind of object is self.db_session? You use it as a 
> context manager without calling it, so I don't think it can be a 
> sessionmaker or a session.
>
> You're nesting calls to the context manager:
>
> # in load_new_data
> with self.db_session as outersession:
> # add new_obj to outersession
> # call move_to_error
> with self.db_session as innersession:
> # add new_obj to innersession
>
> Are innersession and outersession supposed to be the same object? If they 
> are different sessions, you're trying to add new_obj to both of them, which 
> is going to be a problem.
>
> If it were me, I would explicitly pass the session to the move_to_error 
> method. If you don't like that, you can also use 
> sqlalchemy.orm.object_session to get the session that new_obj already 
> belongs to.
>
> Hope that helps,
>
> Simon
>
> On Fri, Apr 29, 2022 at 5:10 AM Andrew Martin  wrote:
>
>> Hi all, I'm struggling a bit with best practices for my ETL application.
>>
>> Each part of the ETL app is completely separate from the others, but I 
>> have a MixIn for some common functions that each of them need to do, like 
>> move this record to error if there's a data integrity problem. Or move this 
>> record to manual review if there's insufficient data to move it along to 
>> the next stage of the ETL.
>>
>> The problem I'm having is that I don't understand the correct way to pass 
>> an object to a function, update it, and eventually commit it.
>>
>> I have for example:
>>
>> class DataMoverMixin:
>> def __init__(self) -> None:
>> self.db_session = get_db_session()
>> 
>>
>> self.move_to_error(obj: Any, error_stage: str, traceback: Exception) 
>> -> bool:
>> logger.info("Moving object to error.")
>> json_data = json.dumps(obj, cls=AlchemyEncoder)
>> e = Error(
>> id=obj.id,
>> error_stage=error_stage,
>> error_message=repr(traceback),
>> error_data=json_data,
>> )
>> obj.status = "error"
>> with self.db_session as session:
>> session.add(e)
>> session.add(obj)
>> session.commit()
>> logger.info("Successfully moved object to error.")
>> return True
>>
>> class IngestDataManager(DataMoverMixin):
>> def __init__(self):
>> super().__init__()
>> 
>>
>>
>> def load_new_data(self, accounts: List[Dict]) -> bool:
>> for acc in accounts:
>> new_obj = NewObj(**acc)
>> with self.db_session as session:
>> session.add(new_obj)
>> session.commit()
>> # now the raw data is loaded, I need to check if it 
>> conforms and do some stuff  with the newly created id. 
>> session.refresh(new_obj)
>> if not new_obj.important_stuff:
>>  self.move_to_error(new_obj, 
>> "ingest_integrity_error", f"missing {important stuff} for account_id: {
>> new_obj.id}
>>
>>
>> This is the simplest example of what does and doesn't work. And I can 
>> tell from the errors that I must be doing something very anti pattern, but 
>> I can't quite figure out what.
>>
>> This pattern gives me a DetachedInstanceError.
>>
>> So if I change Mixin.move_to_error like so:
>>
>> . . . 
>> with self.db_session as session:
>> session.refresh(obj)
>> obj.status = "error"
>> session.add(e)
>> session.add(obj)
>> 

Re: [sqlalchemy] Session management for general functions within a class

2022-04-29 Thread Simon King
It's difficult to debug this without a script that we can run to reproduce
the problem. What kind of object is self.db_session? You use it as a
context manager without calling it, so I don't think it can be a
sessionmaker or a session.

You're nesting calls to the context manager:

# in load_new_data
with self.db_session as outersession:
# add new_obj to outersession
# call move_to_error
with self.db_session as innersession:
# add new_obj to innersession

Are innersession and outersession supposed to be the same object? If they
are different sessions, you're trying to add new_obj to both of them, which
is going to be a problem.

If it were me, I would explicitly pass the session to the move_to_error
method. If you don't like that, you can also use
sqlalchemy.orm.object_session to get the session that new_obj already
belongs to.

Hope that helps,

Simon

On Fri, Apr 29, 2022 at 5:10 AM Andrew Martin  wrote:

> Hi all, I'm struggling a bit with best practices for my ETL application.
>
> Each part of the ETL app is completely separate from the others, but I
> have a MixIn for some common functions that each of them need to do, like
> move this record to error if there's a data integrity problem. Or move this
> record to manual review if there's insufficient data to move it along to
> the next stage of the ETL.
>
> The problem I'm having is that I don't understand the correct way to pass
> an object to a function, update it, and eventually commit it.
>
> I have for example:
>
> class DataMoverMixin:
> def __init__(self) -> None:
> self.db_session = get_db_session()
> 
>
> self.move_to_error(obj: Any, error_stage: str, traceback: Exception)
> -> bool:
> logger.info("Moving object to error.")
> json_data = json.dumps(obj, cls=AlchemyEncoder)
> e = Error(
> id=obj.id,
> error_stage=error_stage,
> error_message=repr(traceback),
> error_data=json_data,
> )
> obj.status = "error"
> with self.db_session as session:
> session.add(e)
> session.add(obj)
> session.commit()
> logger.info("Successfully moved object to error.")
> return True
>
> class IngestDataManager(DataMoverMixin):
> def __init__(self):
> super().__init__()
> 
>
>
> def load_new_data(self, accounts: List[Dict]) -> bool:
> for acc in accounts:
> new_obj = NewObj(**acc)
> with self.db_session as session:
> session.add(new_obj)
> session.commit()
> # now the raw data is loaded, I need to check if it
> conforms and do some stuff  with the newly created id.
> session.refresh(new_obj)
> if not new_obj.important_stuff:
>  self.move_to_error(new_obj, "ingest_integrity_error",
> f"missing {important stuff} for account_id: {new_obj.id}
>
>
> This is the simplest example of what does and doesn't work. And I can tell
> from the errors that I must be doing something very anti pattern, but I
> can't quite figure out what.
>
> This pattern gives me a DetachedInstanceError.
>
> So if I change Mixin.move_to_error like so:
>
> . . .
> with self.db_session as session:
> session.refresh(obj)
> obj.status = "error"
> session.add(e)
> session.add(obj)
> session.commit()
> . . .
>
> I get no error. But also the changes to the obj are not actually committed
> to the DB.
> The new record for error is committed.
>
> My expectation was that by attaching the session to the class that any
> method on the class would reference the same session, and that using the
> context manager was just a good practice to open and close it. But that
> doesn't seem to be the case.
>
> I might certainly be wrong, but it appears that when you pass an
> SQLAlchemy object to a function inside of a session context manager, it
> does not carry the session with it?
>
> And also reopening what I think is the session in a context manager fixes
> that but also then doesn't allow me to update the object?
>
> I guess I'm just kinda confused, and I'm sure there's a better way to do
> this.
>
> I've searched around a lot to try and understand this problem, but for
> whatever reason, nothing has clicked for me about what I'm doing wrong.
>
> Appreciate any help from people.
>
> -andrew
>
>
> --
> SQLAlchemy -
> The Python SQL Toolkit and Object Relational Mapper
>
> http://www.sqlalchemy.org/
>
> To post example code, please provide an MCVE: Minimal, Complete, and
> Verifiable Example. See http://stackoverflow.com/help/mcve for a full
> description.
> ---
> You received this message because you are subscribed to the Google Groups
> "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to sqlalchemy+unsubscr...@googlegroups.com.
> To view this discussion on the web visit
> 

[sqlalchemy] Session management for general functions within a class

2022-04-28 Thread Andrew Martin
Hi all, I'm struggling a bit with best practices for my ETL application.

Each part of the ETL app is completely separate from the others, but I have 
a MixIn for some common functions that each of them need to do, like move 
this record to error if there's a data integrity problem. Or move this 
record to manual review if there's insufficient data to move it along to 
the next stage of the ETL.

The problem I'm having is that I don't understand the correct way to pass 
an object to a function, update it, and eventually commit it.

I have for example:

class DataMoverMixin:
def __init__(self) -> None:
self.db_session = get_db_session()


self.move_to_error(obj: Any, error_stage: str, traceback: Exception) -> 
bool:
logger.info("Moving object to error.")
json_data = json.dumps(obj, cls=AlchemyEncoder)
e = Error(
id=obj.id,
error_stage=error_stage,
error_message=repr(traceback),
error_data=json_data,
)
obj.status = "error"
with self.db_session as session:
session.add(e)
session.add(obj)
session.commit()
logger.info("Successfully moved object to error.")
return True

class IngestDataManager(DataMoverMixin):
def __init__(self):
super().__init__()



def load_new_data(self, accounts: List[Dict]) -> bool:
for acc in accounts:
new_obj = NewObj(**acc)
with self.db_session as session:
session.add(new_obj)
session.commit()
# now the raw data is loaded, I need to check if it 
conforms and do some stuff  with the newly created id. 
session.refresh(new_obj)
if not new_obj.important_stuff:
 self.move_to_error(new_obj, "ingest_integrity_error", 
f"missing {important stuff} for account_id: {new_obj.id}


This is the simplest example of what does and doesn't work. And I can tell 
from the errors that I must be doing something very anti pattern, but I 
can't quite figure out what.

This pattern gives me a DetachedInstanceError.

So if I change Mixin.move_to_error like so:

. . . 
with self.db_session as session:
session.refresh(obj)
obj.status = "error"
session.add(e)
session.add(obj)
session.commit()
. . .

I get no error. But also the changes to the obj are not actually committed 
to the DB.
The new record for error is committed.
 
My expectation was that by attaching the session to the class that any 
method on the class would reference the same session, and that using the 
context manager was just a good practice to open and close it. But that 
doesn't seem to be the case. 

I might certainly be wrong, but it appears that when you pass an SQLAlchemy 
object to a function inside of a session context manager, it does not carry 
the session with it?

And also reopening what I think is the session in a context manager fixes 
that but also then doesn't allow me to update the object?

I guess I'm just kinda confused, and I'm sure there's a better way to do 
this.

I've searched around a lot to try and understand this problem, but for 
whatever reason, nothing has clicked for me about what I'm doing wrong.

Appreciate any help from people.

-andrew


-- 
SQLAlchemy - 
The Python SQL Toolkit and Object Relational Mapper

http://www.sqlalchemy.org/

To post example code, please provide an MCVE: Minimal, Complete, and Verifiable 
Example.  See  http://stackoverflow.com/help/mcve for a full description.
--- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/sqlalchemy/628f6d67-51ce-4251-a90e-9f27341b793cn%40googlegroups.com.