kazanzhy commented on PR #25430: URL: https://github.com/apache/airflow/pull/25430#issuecomment-1201785617
First of all, I made one more error here. There is `cursor.execute()` almost everywhere except ExasolHook where `conn.execute()` is called. And only last one is returning `CursorResult`. for other cases there are different cursors for different databases. So if I correctly understand we have to determine how to figure out if `.fetchall()` could be called in the cursor. I see the next solutions: 1. `try ... except` straight but can slow down the process. 2. Use one of the cursor attributes (https://peps.python.org/pep-0249/#cursor-attributes): * `description is not None` ... will be None for operations that do not return rows or if the cursor has not had an operation invoked via the .execute*() method yet. We could guarantee that the handler will be called only after the `.execute` in DbApiHook. But in DbApiHook we're calling `.execute` many times in the same cursor. * `cursor.rowcount > 0` ... specifies the number of rows that the last .execute*() produced (for DQL statements like SELECT) or affected (for DML statements like UPDATE or INSERT). We have to fetch results so there is no guarantee that DML statements will return some results * `rownumber is not None` ... should provide the current 0-based index of the cursor in the result set or None if the index cannot be determined. The index can be seen as the index of the cursor in a sequence (the result set). The next fetch operation will fetch the row indexed by .rownumber in that sequence. It probably couldn't be used Here's the implementation for Postgres and seems we could use description. https://github.com/psycopg/psycopg2/search?q=notuples And here are some experiments: ``` from sqlalchemy import create_engine engine = create_engine('postgresql://reader:[email protected]:5432/pfmegrnargs') connection = engine.connect().execution_options(autocommit=True).connection cursor = connection.cursor() cursor.execute('SELECT 1;') print(cursor.description) # (Column(name='?column?', type_code=23),) print(cursor.rowcount) # 1 print(cursor.rownumber) # 0 cursor.fetchall() # [(1,)] cursor = connection.cursor() query = """ CREATE TEMP TABLE IF NOT EXISTS tmp (field TEXT); """ cursor.execute(query) print(cursor.description) # None print(cursor.rowcount) # -1 print(cursor.rownumber) # 0 cursor.fetchall() # psycopg2.ProgrammingError: no results to fetch cursor = connection.cursor() query = """ CREATE TEMP TABLE IF NOT EXISTS tmp (field TEXT); INSERT INTO tmp (field) VALUES ('test'); """ cursor.execute(query) print(cursor.description) # None print(cursor.rowcount) # -1 print(cursor.rownumber) # 0 cursor.fetchall() # psycopg2.ProgrammingError: no results to fetch cursor = connection.cursor() query = """ CREATE TEMP TABLE IF NOT EXISTS tmp (field TEXT); INSERT INTO tmp (field) VALUES ('test') RETURNING *; """ cursor.execute(query) print(cursor.description) # (Column(name='field', type_code=25),) print(cursor.rowcount) # 1 print(cursor.rownumber) # 0 cursor.fetchall() # psycopg2.ProgrammingError: no results to fetch cursor = connection.cursor() query = """ CREATE TEMP TABLE IF NOT EXISTS tmp (field TEXT); INSERT INTO tmp (field) VALUES ('test'); SELECT 1; """ cursor.execute(query) print(cursor.description) # (Column(name='?column?', type_code=23),) print(cursor.rowcount) # 1 print(cursor.rownumber) # 0 cursor.fetchall() # [(1,)] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
