Hi, This is what I tried first, but it's not working.
pg = PostgresHook(postgres_conn_id=self.postgres_conn_id) pg.run(...) This part of code will always use a a new connection, and therefore the temp table created in preoperator will not be accessible for the COPY command. For this to work, I had to use a cursor instead. Flo On Sat, Mar 23, 2019 at 10:22 AM Jiajie Zhong <[email protected]> wrote: > I wrote a demo code here. maybe it not work but I think the idea is right. > > > # create file name file_to_Pg.py > from airflow.models import BaseOperator > from airflow.hooks.postgres_hook import PostgresHook > > class FileToPgTransfer(BaseOperator): > def __init__(self, postgres_conn_id, pg_preoperator, sql, > pg_postoperator, ... , *args, **kwargs): > # init here esle > self.postgres_conn_id = postgres_conn_id > self.pg_preoperator = pg_preoperator > self.sql = sql > self.pg_postoperator = pg_postoperator > # init here esle > > def execute(self, context): > pg = PostgresHook(postgres_conn_id=self.postgres_conn_id) > > if self.pg_preoperator: > self.log.info("Running Postgres preoperator") > pg.run(self.pg_preoperator) > > self.log.info("loading file to pg table.") > pg.run(sql=self.sql) > > if self.pg_postoperator: > self.log.info("Running Postgres postoperator") > pg.run(self.pg_postoperator) > > > # you could use using below code in DAG file > task = FileToPgTransfer( > postgres_conn_id='postgres_conn_id', > pg_preoperator='CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT > * FROM catalog WITH NO DATA', > sql="\COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH > DELIMITER ';' CSV ENCODING 'LATIN1' NULL ''", > pg_postoperator='DELETE FROM catalog_tmp WHERE code IS NULL', > ) > > > Best wish. > -- jiajie > ________________________________ > From: Flo Rance <[email protected]> > Sent: Thursday, March 21, 2019 22:31 > To: [email protected] > Subject: Re: PostgreSQL hook > > I've found some more information that seems to confirm my suspicion. > > The connection is not persistent between the pg_preoperator step and the > copy_expert one. > > There's a suggestion to make persistence a property of the connection: > > https://stackoverflow.com/questions/50858770/airflow-retain-the-same-database-connection > > I would be very grateful if someone could help me implement that in my > operator that uses PostgreSQL Hook. > > Regards, > Flo > > On Thu, Mar 21, 2019 at 2:38 PM Flo Rance <[email protected]> wrote: > > > No problem. > > > > Thanks for the link, I was able to create a plugin and an operator that > do > > almost what I want. > > > > My only issue is regarding the temp table, because it's not available > when > > I call copy_expert. So it seems to me that's not the same session as the > > one that created the temp table previously, because if I use a standard > > table I don't have this issue. > > > > Does anyone have an idea how to fix this? > > > > Regards, > > Flo > > > > On Thu, Mar 21, 2019 at 9:36 AM jiajie zhong <[email protected] > > > > wrote: > > > >> Using Airflow plugins, maybe you should take a look at > >> https://airflow.apache.org/plugins.html. > >> > >> BTW, sorry for send duplicate e-mail last night, due to my network > failure > >> > >> Best wish. > >> - jiajie > >> > >> ________________________________ > >> > >> Hi, > >> > >> Thank you for this explanation. If I summarize, I'll have to write a > >> file_to_postgres Operator, with pg_preoperator and pg_postoperator > >> parameters. > >> > >> Just a simple question: Where should I add and store this Operator in > the > >> airflow ecosystem ? > >> > >> Regards, > >> Flo > >> > >> On Wed, Mar 20, 2019 at 5:05 PM jiajie zhong < > [email protected]> > >> wrote: > >> > >> > Hi, Flo. I am not good at PG, but I find code in out master branch > >> > > >> > > >> > https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89 > >> > < > >> > > >> > https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83 > >> > > > >> > I think maybe this is what you looking for. > >> > > >> > And, we recommend use Operator to do something instead of Hook. But in > >> we > >> > have no "local-file-pg-operator". maybe you should and this function > by > >> > youself. > >> > > >> > BWT, I think > >> > BEGIN; > >> > > >> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog > >> WITH > >> > NO DATA; > >> > > >> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER > ';' > >> CSV > >> > ENCODING 'LATIN1' NULL ''; > >> > > >> > DELETE FROM catalog_tmp WHERE code IS NULL; > >> > ... > >> > COMMIT; > >> > what you said is a transaction, and so do in a single operator. you > >> could > >> > write code just like > >> > > >> > > >> > https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71 > >> > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]< > >> > > >> > https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71 > >> > > > >> > > >> > apache/airflow< > >> > > >> > https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71 > >> > > > >> > Apache Airflow. Contribute to apache/airflow development by creating > an > >> > account on GitHub. > >> > github.com > >> > > >> > that have "pg_preoperator" and "pg_postoperator" parameter, but > extract > >> > data from local file instand of hive. > >> > > >> > ________________________________ > >> > From: Flo Rance <[email protected]> > >> > Sent: Wednesday, March 20, 2019 23:30 > >> > To: [email protected] > >> > Subject: PostgreSQL hook > >> > > >> > Hi, > >> > > >> > I don't know if it's the correct place to ask for that. > >> > > >> > I'm trying to implement one of my cronjob using airflow. One of the > >> tasks > >> > is to load files in a temporary table and then update another table > in a > >> > postgres db. > >> > For that, I was previously using a sql script like that: > >> > > >> > BEGIN; > >> > > >> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog > >> WITH > >> > NO DATA; > >> > > >> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER > ';' > >> CSV > >> > ENCODING 'LATIN1' NULL ''; > >> > > >> > DELETE FROM catalog_tmp WHERE code IS NULL; > >> > ... > >> > COMMIT; > >> > > >> > I would like to replace \copy with the copy_expert from postgresql > >> hook. Is > >> > that realistic ? > >> > If yes, how can I combine a sql script and that hook in one task ? > >> > > >> > Regards, > >> > Flo > >> > > >> > > >
