NathanFarmer commented on issue #9918:
URL: https://github.com/apache/airflow/issues/9918#issuecomment-662562447


   Solved by overriding the get_records method from the dbapihook class:
   
   ```
   from airflow.hooks.oracle_hook import OracleHook
   import cx_Oracle
   import sys
   
   from datetime import datetime
   from contextlib import closing
   import os
   os.environ['NLS_DATE_FORMAT'] = 'YYYY-MM-DD HH24:MI:SS'
   
   class OracleHookTypeHandler(OracleHook):
       def __init__(self, oracle_conn_id):
           OracleHook.__init__(self, oracle_conn_id)
   
       # Override get_records from inherited class dbapihook
       def get_records(self, sql, parameters=None):
           """
           Executes the sql and returns a set of records.
   
           :param sql: the sql statement to be executed (str) or a list of
               sql statements to execute
           :type sql: str or list
           :param parameters: The parameters to render the SQL query with.
           :type parameters: mapping or iterable
           """
           if sys.version_info[0] < 3:
               sql = sql.encode('utf-8')
   
           with closing(self.get_conn()) as conn:
               with closing(conn.cursor()) as cur:
       
                   cur.outputtypehandler = self.OutputHandler
   
                   if parameters is not None:
                       cur.execute(sql, parameters)
                   else:
                       cur.execute(sql)
                   return cur.fetchall()
   
       # Dealing with invalid years in the database
       def DateTimeConverter(self, value):
           print('DateTimeConverter was called')
           if value.startswith('4712'):
               return None
           return datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
   
       def OutputHandler(self, cursor, name, defaulttype, length, precision, 
scale):
           print('OutputHandler was called')
           if defaulttype == cx_Oracle.DATETIME:
               return cursor.var(cx_Oracle.STRING, arraysize=cursor.arraysize,
                                 outconverter=self.DateTimeConverter)
   
   def extract(extract_connection)
       # Return the extracted records
       extract_records_query = 'SELECT col1, col2, col3 FROM table'
       o_extract_hook = OracleHookTypeHandler(oracle_conn_id=extract_connection)
       print('Extract started')
       extract_records = o_extract_hook.get_records(sql=extract_records_query)
       return extract_records
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to