easontm edited a comment on issue #13391:
URL: https://github.com/apache/airflow/issues/13391#issuecomment-948333552


   Sure, here they are. This particular log run is missing the `.tell()` calls, 
but I confirmed early on that there actually was data in the MySQL query and 
that the csv writer was writing. The `sleep(90)` at the end is just to keep the 
Kubernetes pod alive so I could ssh into it and verify the file name and 
contents.
   
   <details><summary>operator</summary>
   code
   
   
   ```python
   from collections import OrderedDict
   from tempfile import NamedTemporaryFile
   from time import sleep
   from typing import Dict, Optional
   
   import MySQLdb
   import unicodecsv as csv
   
   from airflow.models import BaseOperator
   from airflow.providers.apache.hive.hooks.hive import HiveCliHook
   from airflow.providers.mysql.hooks.mysql import MySqlHook
   
   
   class CustomMySqlToHiveOperator(BaseOperator):
       
       template_fields = ('sql', 'partition', 'hive_table')
       template_ext = ('.sql',)
       ui_color = '#a0e08c'
   
       def __init__(
           self,
           *,
           sql: str,
           hive_table: str,
           create: bool = True,
           recreate: bool = False,
           partition: Optional[Dict] = None,
           delimiter: str = chr(1),
           quoting: Optional[str] = None,
           quotechar: str = '"',
           escapechar: Optional[str] = None,
           mysql_conn_id: str = 'mysql_default',
           hive_cli_conn_id: str = 'hive_cli_default',
           tblproperties: Optional[Dict] = None,
           **kwargs,
       ) -> None:
           super().__init__(**kwargs)
           self.sql = sql
           self.hive_table = hive_table
           self.partition = partition
           self.create = create
           self.recreate = recreate
           self.delimiter = str(delimiter)
           self.quoting = quoting or csv.QUOTE_MINIMAL
           self.quotechar = quotechar
           self.escapechar = escapechar
           self.mysql_conn_id = mysql_conn_id
           self.hive_cli_conn_id = hive_cli_conn_id
           self.partition = partition or {}
           self.tblproperties = tblproperties
   
       @classmethod
       def type_map(cls, mysql_type: int) -> str:
           """Maps MySQL type to Hive type."""
           types = MySQLdb.constants.FIELD_TYPE
           type_map = {
               types.BIT: 'INT',
               types.DECIMAL: 'DOUBLE',
               types.NEWDECIMAL: 'DOUBLE',
               types.DOUBLE: 'DOUBLE',
               types.FLOAT: 'DOUBLE',
               types.INT24: 'INT',
               types.LONG: 'BIGINT',
               types.LONGLONG: 'DECIMAL(38,0)',
               types.SHORT: 'INT',
               types.TINY: 'SMALLINT',
               types.YEAR: 'INT',
               types.TIMESTAMP: 'TIMESTAMP',
           }
           return type_map.get(mysql_type, 'STRING')
   
       def execute(self, context: Dict[str, str]):
           hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
           mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
           
           import os
           failed = False
           fname = None
           field_dict = OrderedDict()
           def UmaskNamedTemporaryFile(*args, **kargs):
               # This code shamelessly lifted from S/O for debugging
               fdesc = NamedTemporaryFile(*args, **kargs)
               umask = os.umask(0o666)
               os.umask(umask)
               os.chmod(fdesc.name, 0o666 & ~umask)
               return fdesc
           
           self.log.info("Dumping MySQL query results to local file")
           conn = mysql.get_conn()
           cursor = conn.cursor()
           cursor.execute(self.sql)
           with UmaskNamedTemporaryFile("wb", delete=False) as f:
               csv_writer = csv.writer(
                   f,
                   delimiter=self.delimiter,
                   quoting=self.quoting,
                   quotechar=self.quotechar,
                   escapechar=self.escapechar,
                   encoding="utf-8",
               )
               # field_dict = OrderedDict()
               for field in cursor.description:
                   field_dict[field[0]] = self.type_map(field[1])
               csv_writer.writerows(cursor)
               f.flush()
               print(f.name)
               print(f.tell()) # writer offset
               fname = f.name
               cursor.close()
               conn.close()
               self.log.info("Loading file into Hive")
               sleep(10) # want to make sure everything is finished writing
               try:
                   hive.load_file(
                       f.name,
                       self.hive_table,
                       field_dict=field_dict,
                       create=self.create,
                       partition=self.partition,
                       delimiter=self.delimiter,
                       recreate=self.recreate,
                       tblproperties=self.tblproperties,
                   )
               except Exception:
                   print("hive failed")
                   failed = True
   
           try:
               hive.load_file(
                   f.name,
                   self.hive_table,
                   field_dict=field_dict,
                   create=self.create,
                   partition=self.partition,
                   delimiter=self.delimiter,
                   recreate=self.recreate,
                   tblproperties=self.tblproperties,
               )
           except Exception:
               print("hive failed AGAIN")
               failed = True
   
           sleep(90)
           if failed:
               raise Exception("still broken")
   
   
   ```
   </details>
   <details><summary>logs</summary>
   
   
   ```
   [2021-10-21 04:55:41,821] {taskinstance.py:1115} INFO - Executing 
<Task(CustomMySqlToHiveOperator): REDACTED> on 2021-10-17T15:00:00+00:00
   [2021-10-21 04:55:41,825] {standard_task_runner.py:52} INFO - Started 
process 16 to run task
   [2021-10-21 04:55:41,828] {standard_task_runner.py:76} INFO - Running: 
['airflow', 'tasks', 'run', 'REDACTED', 'REDACTED', 
'2021-10-17T15:00:00+00:00', '--job-id', '772796', '--pool', 'default_pool', 
'--raw', '--subdir', 'DAGS_FOLDER/REDACTED.py', '--cfg-path', 
'/tmp/tmpefopql15', '--error-file', '/tmp/tmpd9rg6cle']
   [2021-10-21 04:55:41,829] {standard_task_runner.py:77} INFO - Job 772796: 
Subtask REDACTED
   [2021-10-21 04:55:41,969] {logging_mixin.py:109} INFO - Running 
<TaskInstance: REDACTED.REDACTED 2021-10-17T15:00:00+00:00 [running]> on host 
REDACTED
   [2021-10-21 04:55:42,136] {taskinstance.py:1254} INFO - Exporting the 
following env vars:
   REDACTED
   [2021-10-21 04:55:42,163] {base.py:79} INFO - Using connection to: id: 
hive_cli_default. Host: REDACTED, Port: 10000, Schema: default, Login: 
REDACTED, Password: REDACTED, extra: {'use_beeline': True, 'hive_cli_params': 
'--silent=true --hiveconf hive.exec.stagingdir=s3://REDACTED'}
   [2021-10-21 04:55:42,163] {custom_mysql_to_hive.py:155} INFO - Dumping MySQL 
query results to local file
   [2021-10-21 04:55:42,189] {base.py:79} INFO - Using connection to: id: 
mysql_rds. Host: REDACTED, Port: None, Schema: REDACTED Login: REDACTED, 
Password: REDACTED, extra: {}
   [2021-10-21 04:55:42,238] {logging_mixin.py:109} INFO - /tmp/tmphdv9t5yh
   [2021-10-21 04:55:42,239] {custom_mysql_to_hive.py:176} INFO - Loading file 
into Hive
   [2021-10-21 04:55:42,239] {hive.py:458} INFO - LOAD DATA LOCAL INPATH 
'/tmp/tmphdv9t5yh' OVERWRITE INTO TABLE REDACTED.REDACTED PARTITION 
(dt='2021-10-17', hh='15');
   
   [2021-10-21 04:55:42,239] {hive.py:247} INFO - beeline -u 
"jdbc:hive2://REDACTED:10000/default;auth=noSasl" -n hive --silent=true 
--hiveconf hive.exec.stagingdir=s3://REDACTED/ -hiveconf 
airflow.ctx.dag_id=REDACTED -hiveconf airflow.ctx.task_id=REDACTED -hiveconf 
airflow.ctx.execution_date=2021-10-17T15:00:00+00:00 -hiveconf 
airflow.ctx.dag_run_id=scheduled__2021-10-17T15:00:00+00:00 -hiveconf 
airflow.ctx.dag_owner=REDACTED -hiveconf airflow.ctx.dag_email=REDACTED -f 
/tmp/airflow_hiveop_wly5owxt/tmprxsx_v66
   [2021-10-21 04:55:42,254] {hive.py:259} INFO - /bin/bash: warning: 
setlocale: LC_ALL: cannot change locale (en_US.UTF-8)
   [2021-10-21 04:55:42,648] {hive.py:259} INFO - -hiveconf (No such file or 
directory)
   [2021-10-21 04:55:42,649] {hive.py:259} INFO - airflow.ctx.dag_id=REDACTED 
(No such file or directory)
   [2021-10-21 04:55:42,649] {hive.py:259} INFO - -hiveconf (No such file or 
directory)
   [2021-10-21 04:55:42,649] {hive.py:259} INFO - airflow.ctx.task_id=REDACTED 
(No such file or directory)
   [2021-10-21 04:55:42,649] {hive.py:259} INFO - -hiveconf (No such file or 
directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - 
airflow.ctx.execution_date=2021-10-17T15:00:00+00:00 (No such file or directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - -hiveconf (No such file or 
directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - 
airflow.ctx.dag_run_id=scheduled__2021-10-17T15:00:00+00:00 (No such file or 
directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - -hiveconf (No such file or 
directory)
   [2021-10-21 04:55:42,650] {hive.py:259} INFO - 
airflow.ctx.dag_owner=REDACTED (No such file or directory)
   [2021-10-21 04:55:42,651] {hive.py:259} INFO - -hiveconf (No such file or 
directory)
   [2021-10-21 04:55:42,651] {hive.py:259} INFO - 
airflow.ctx.dag_email=REDACTED (No such file or directory)
   [2021-10-21 04:55:42,759] {hive.py:259} INFO - Error: Error while compiling 
statement: FAILED: SemanticException Line 1:23 Invalid path 
''/tmp/tmphdv9t5yh'': No files matching path file:/tmp/tmphdv9t5yh 
(state=42000,code=40000)
   [2021-10-21 04:55:42,783] {logging_mixin.py:109} INFO - hive failed
   [2021-10-21 04:56:42,841] {taskinstance.py:1463} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 
1165, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 
1283, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 
1313, in _execute_task
       result = task_copy.execute(context=context)
     File "/usr/local/airflow/dags/operators/custom_mysql_to_hive.py", line 
193, in execute
       raise Exception("still broken")
   Exception: still broken
   [2021-10-21 04:56:42,851] {taskinstance.py:1513} INFO - Marking task as 
FAILED. dag_id=REDACTED, task_id=REDACTED, execution_date=20211017T150000, 
start_date=20211021T045541, end_date=20211021T045642
   [2021-10-21 04:56:42,927] {local_task_job.py:151} INFO - Task exited with 
return code 1
   ```
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to