potiuk commented on a change in pull request #20618:
URL: https://github.com/apache/airflow/pull/20618#discussion_r778816203
##########
File path: airflow/providers/mysql/transfers/vertica_to_mysql.py
##########
@@ -94,63 +94,72 @@ def execute(self, context: 'Context'):
vertica = VerticaHook(vertica_conn_id=self.vertica_conn_id)
mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
- tmpfile = None
- result = None
+ if self.bulk_load:
+ self._bulk_load_transfer(mysql, vertica)
+ else:
+ self._non_bulk_load_transfer(mysql, vertica)
- selected_columns = []
+ if self.mysql_postoperator:
+ self.log.info("Running MySQL postoperator...")
+ mysql.run(self.mysql_postoperator)
- count = 0
+ self.log.info("Done")
+
+ def _non_bulk_load_transfer(self, mysql, vertica):
with closing(vertica.get_conn()) as conn:
with closing(conn.cursor()) as cursor:
cursor.execute(self.sql)
selected_columns = [d.name for d in cursor.description]
+ self.log.info("Selecting rows from Vertica...")
+ self.log.info(self.sql)
- if self.bulk_load:
- with NamedTemporaryFile("w") as tmpfile:
- self.log.info("Selecting rows from Vertica to local
file %s...", tmpfile.name)
- self.log.info(self.sql)
+ result = cursor.fetchall()
+ count = len(result)
- csv_writer = csv.writer(tmpfile, delimiter='\t',
encoding='utf-8')
- for row in cursor.iterate():
- csv_writer.writerow(row)
- count += 1
+ self.log.info("Selected rows from Vertica %s", count)
+ self._run_preoperator(mysql)
+ try:
+ self.log.info("Inserting rows into MySQL...")
+ mysql.insert_rows(table=self.mysql_table, rows=result,
target_fields=selected_columns)
+ self.log.info("Inserted rows into MySQL %s", count)
+ except (MySQLdb.Error, MySQLdb.Warning):
+ self.log.info("Inserted rows into MySQL 0")
Review comment:
Ah no - it's good actually. It prints an info that no rows are inserted
and re-raises the exception, so things are good here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]