I wrote a demo code here. maybe it not work but I think the idea is right.

# create file name file_to_Pg.py
from airflow.models import BaseOperator
from airflow.hooks.postgres_hook import PostgresHook

class FileToPgTransfer(BaseOperator):
    def __init__(self, postgres_conn_id, pg_preoperator, sql, pg_postoperator, 
... , *args, **kwargs):
        # init here esle
        self.postgres_conn_id = postgres_conn_id
        self.pg_preoperator = pg_preoperator
        self.sql = sql
        self.pg_postoperator = pg_postoperator
        # init here esle

    def execute(self, context):
        pg = PostgresHook(postgres_conn_id=self.postgres_conn_id)

        if self.pg_preoperator:
            self.log.info("Running Postgres preoperator")
            pg.run(self.pg_preoperator)

        self.log.info("loading file to pg table.")
        pg.run(sql=self.sql)

        if self.pg_postoperator:
            self.log.info("Running Postgres postoperator")
            pg.run(self.pg_postoperator)


# you could use using below code in DAG file
task = FileToPgTransfer(
    postgres_conn_id='postgres_conn_id',
    pg_preoperator='CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * 
FROM catalog WITH NO DATA',
    sql="\COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER 
';' CSV ENCODING 'LATIN1' NULL ''",
    pg_postoperator='DELETE FROM catalog_tmp WHERE code IS NULL',
)


Best wish.
-- jiajie
________________________________
From: Flo Rance <[email protected]>
Sent: Thursday, March 21, 2019 22:31
To: [email protected]
Subject: Re: PostgreSQL hook

I've found some more information that seems to confirm my suspicion.

The connection is not persistent between the pg_preoperator step and the
copy_expert one.

There's a suggestion to make persistence a property of the connection:
https://stackoverflow.com/questions/50858770/airflow-retain-the-same-database-connection

I would be very grateful if someone could help me implement that in my
operator that uses PostgreSQL Hook.

Regards,
Flo

On Thu, Mar 21, 2019 at 2:38 PM Flo Rance <[email protected]> wrote:

> No problem.
>
> Thanks for the link, I was able to create a plugin and an operator that do
> almost what I want.
>
> My only issue is regarding the temp table, because it's not available when
> I call copy_expert. So it seems to me that's not the same session as the
> one that created the temp table previously, because if I use a standard
> table I don't have this issue.
>
> Does anyone have an idea how to fix this?
>
> Regards,
> Flo
>
> On Thu, Mar 21, 2019 at 9:36 AM jiajie zhong <[email protected]>
> wrote:
>
>> Using Airflow plugins, maybe you should take a look at
>> https://airflow.apache.org/plugins.html.
>>
>> BTW, sorry for send duplicate e-mail last night, due to my network failure
>>
>> Best wish.
>> - jiajie
>>
>> ________________________________
>>
>> Hi,
>>
>> Thank you for this explanation. If I summarize, I'll have to write a
>> file_to_postgres Operator, with pg_preoperator and pg_postoperator
>> parameters.
>>
>> Just a simple question: Where should I add and store this Operator in the
>> airflow ecosystem ?
>>
>> Regards,
>> Flo
>>
>> On Wed, Mar 20, 2019 at 5:05 PM jiajie zhong <[email protected]>
>> wrote:
>>
>> > Hi, Flo. I am not good at PG, but I find code in out master branch
>> >
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89
>> > <
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83
>> > >
>> > I think maybe this is what you looking for.
>> >
>> > And, we recommend use Operator to do something instead of Hook. But in
>> we
>> > have no "local-file-pg-operator". maybe you should and this function by
>> > youself.
>> >
>> > BWT, I think
>> > BEGIN;
>> >
>> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
>> WITH
>> > NO DATA;
>> >
>> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';'
>> CSV
>> > ENCODING 'LATIN1' NULL '';
>> >
>> > DELETE FROM catalog_tmp WHERE code IS NULL;
>> > ...
>> > COMMIT;
>> > what you said is a transaction, and so do in a single operator. you
>> could
>> > write code just like
>> >
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
>> > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
>> > >
>> >
>> > apache/airflow<
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
>> > >
>> > Apache Airflow. Contribute to apache/airflow development by creating an
>> > account on GitHub.
>> > github.com
>> >
>> > that have "pg_preoperator" and "pg_postoperator" parameter, but extract
>> > data from local file instand of hive.
>> >
>> > ________________________________
>> > From: Flo Rance <[email protected]>
>> > Sent: Wednesday, March 20, 2019 23:30
>> > To: [email protected]
>> > Subject: PostgreSQL hook
>> >
>> > Hi,
>> >
>> > I don't know if it's the correct place to ask for that.
>> >
>> > I'm trying to implement one of my cronjob using airflow. One of the
>> tasks
>> > is to load files in a temporary table and then update another table in a
>> > postgres db.
>> > For that, I was previously using a sql script like that:
>> >
>> > BEGIN;
>> >
>> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
>> WITH
>> > NO DATA;
>> >
>> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';'
>> CSV
>> > ENCODING 'LATIN1' NULL '';
>> >
>> > DELETE FROM catalog_tmp WHERE code IS NULL;
>> > ...
>> > COMMIT;
>> >
>> > I would like to replace \copy with the copy_expert from postgresql
>> hook. Is
>> > that realistic ?
>> > If yes, how can I combine a sql script and that hook in one task ?
>> >
>> > Regards,
>> > Flo
>> >
>>
>

Reply via email to