easontm commented on issue #13391:
URL: https://github.com/apache/airflow/issues/13391#issuecomment-948333552
Sure, here they are. This particular log run is missing the `.tell()` calls,
but I confirmed early on that there actually was data in the MySQL query and
that the writer was writing. The `sleep(90)` at the end is just to keep the
Kubernetes pod alive so I could ssh into it and verify the file name and
contents.
<details><summary>operator</summary>
code
```python
from collections import OrderedDict
from tempfile import NamedTemporaryFile
from time import sleep
from typing import Dict, Optional
import MySQLdb
import unicodecsv as csv
from airflow.models import BaseOperator
from airflow.providers.apache.hive.hooks.hive import HiveCliHook
from airflow.providers.mysql.hooks.mysql import MySqlHook
class CustomMySqlToHiveOperator(BaseOperator):
template_fields = ('sql', 'partition', 'hive_table')
template_ext = ('.sql',)
ui_color = '#a0e08c'
def __init__(
self,
*,
sql: str,
hive_table: str,
create: bool = True,
recreate: bool = False,
partition: Optional[Dict] = None,
delimiter: str = chr(1),
quoting: Optional[str] = None,
quotechar: str = '"',
escapechar: Optional[str] = None,
mysql_conn_id: str = 'mysql_default',
hive_cli_conn_id: str = 'hive_cli_default',
tblproperties: Optional[Dict] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.sql = sql
self.hive_table = hive_table
self.partition = partition
self.create = create
self.recreate = recreate
self.delimiter = str(delimiter)
self.quoting = quoting or csv.QUOTE_MINIMAL
self.quotechar = quotechar
self.escapechar = escapechar
self.mysql_conn_id = mysql_conn_id
self.hive_cli_conn_id = hive_cli_conn_id
self.partition = partition or {}
self.tblproperties = tblproperties
@classmethod
def type_map(cls, mysql_type: int) -> str:
"""Maps MySQL type to Hive type."""
types = MySQLdb.constants.FIELD_TYPE
type_map = {
types.BIT: 'INT',
types.DECIMAL: 'DOUBLE',
types.NEWDECIMAL: 'DOUBLE',
types.DOUBLE: 'DOUBLE',
types.FLOAT: 'DOUBLE',
types.INT24: 'INT',
types.LONG: 'BIGINT',
types.LONGLONG: 'DECIMAL(38,0)',
types.SHORT: 'INT',
types.TINY: 'SMALLINT',
types.YEAR: 'INT',
types.TIMESTAMP: 'TIMESTAMP',
}
return type_map.get(mysql_type, 'STRING')
def execute(self, context: Dict[str, str]):
hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
import os
failed = False
fname = None
field_dict = OrderedDict()
def UmaskNamedTemporaryFile(*args, **kargs):
# This code shamelessly lifted from S/O for debugging
fdesc = NamedTemporaryFile(*args, **kargs)
umask = os.umask(0o666)
os.umask(umask)
os.chmod(fdesc.name, 0o666 & ~umask)
return fdesc
self.log.info("Dumping MySQL query results to local file")
conn = mysql.get_conn()
cursor = conn.cursor()
cursor.execute(self.sql)
with UmaskNamedTemporaryFile("wb", delete=False) as f:
csv_writer = csv.writer(
f,
delimiter=self.delimiter,
quoting=self.quoting,
quotechar=self.quotechar,
escapechar=self.escapechar,
encoding="utf-8",
)
# field_dict = OrderedDict()
for field in cursor.description:
field_dict[field[0]] = self.type_map(field[1])
csv_writer.writerows(cursor)
f.flush()
print(f.name)
print(f.tell()) # writer offset
fname = f.name
cursor.close()
conn.close()
self.log.info("Loading file into Hive")
sleep(10) # want to make sure everything is finished writing
try:
hive.load_file(
f.name,
self.hive_table,
field_dict=field_dict,
create=self.create,
partition=self.partition,
delimiter=self.delimiter,
recreate=self.recreate,
tblproperties=self.tblproperties,
)
except Exception:
print("hive failed")
failed = True
try:
hive.load_file(
f.name,
self.hive_table,
field_dict=field_dict,
create=self.create,
partition=self.partition,
delimiter=self.delimiter,
recreate=self.recreate,
tblproperties=self.tblproperties,
)
except Exception:
print("hive failed AGAIN")
failed = True
sleep(90)
if failed:
raise Exception("still broken")
```
</details>
<details><summary>logs</summary>
```
[2021-10-21 04:55:41,821] {taskinstance.py:1115} INFO - Executing
<Task(CustomMySqlToHiveOperator): REDACTED> on 2021-10-17T15:00:00+00:00
[2021-10-21 04:55:41,825] {standard_task_runner.py:52} INFO - Started
process 16 to run task
[2021-10-21 04:55:41,828] {standard_task_runner.py:76} INFO - Running:
['airflow', 'tasks', 'run', 'REDACTED', 'REDACTED',
'2021-10-17T15:00:00+00:00', '--job-id', '772796', '--pool', 'default_pool',
'--raw', '--subdir', 'DAGS_FOLDER/REDACTED.py', '--cfg-path',
'/tmp/tmpefopql15', '--error-file', '/tmp/tmpd9rg6cle']
[2021-10-21 04:55:41,829] {standard_task_runner.py:77} INFO - Job 772796:
Subtask REDACTED
[2021-10-21 04:55:41,969] {logging_mixin.py:109} INFO - Running
<TaskInstance: REDACTED.REDACTED 2021-10-17T15:00:00+00:00 [running]> on host
REDACTED
[2021-10-21 04:55:42,136] {taskinstance.py:1254} INFO - Exporting the
following env vars:
REDACTED
[2021-10-21 04:55:42,163] {base.py:79} INFO - Using connection to: id:
hive_cli_default. Host: REDACTED, Port: 10000, Schema: default, Login:
REDACTED, Password: REDACTED, extra: {'use_beeline': True, 'hive_cli_params':
'--silent=true --hiveconf hive.exec.stagingdir=s3://REDACTED'}
[2021-10-21 04:55:42,163] {custom_mysql_to_hive.py:155} INFO - Dumping MySQL
query results to local file
[2021-10-21 04:55:42,189] {base.py:79} INFO - Using connection to: id:
mysql_rds. Host: REDACTED, Port: None, Schema: REDACTED Login: REDACTED,
Password: REDACTED, extra: {}
[2021-10-21 04:55:42,238] {logging_mixin.py:109} INFO - /tmp/tmphdv9t5yh
[2021-10-21 04:55:42,239] {custom_mysql_to_hive.py:176} INFO - Loading file
into Hive
[2021-10-21 04:55:42,239] {hive.py:458} INFO - LOAD DATA LOCAL INPATH
'/tmp/tmphdv9t5yh' OVERWRITE INTO TABLE REDACTED.REDACTED PARTITION
(dt='2021-10-17', hh='15');
[2021-10-21 04:55:42,239] {hive.py:247} INFO - beeline -u
"jdbc:hive2://REDACTED:10000/default;auth=noSasl" -n hive --silent=true
--hiveconf hive.exec.stagingdir=s3://REDACTED/ -hiveconf
airflow.ctx.dag_id=REDACTED -hiveconf airflow.ctx.task_id=REDACTED -hiveconf
airflow.ctx.execution_date=2021-10-17T15:00:00+00:00 -hiveconf
airflow.ctx.dag_run_id=scheduled__2021-10-17T15:00:00+00:00 -hiveconf
airflow.ctx.dag_owner=REDACTED -hiveconf airflow.ctx.dag_email=REDACTED -f
/tmp/airflow_hiveop_wly5owxt/tmprxsx_v66
[2021-10-21 04:55:42,254] {hive.py:259} INFO - /bin/bash: warning:
setlocale: LC_ALL: cannot change locale (en_US.UTF-8)
[2021-10-21 04:55:42,648] {hive.py:259} INFO - -hiveconf (No such file or
directory)
[2021-10-21 04:55:42,649] {hive.py:259} INFO - airflow.ctx.dag_id=REDACTED
(No such file or directory)
[2021-10-21 04:55:42,649] {hive.py:259} INFO - -hiveconf (No such file or
directory)
[2021-10-21 04:55:42,649] {hive.py:259} INFO - airflow.ctx.task_id=REDACTED
(No such file or directory)
[2021-10-21 04:55:42,649] {hive.py:259} INFO - -hiveconf (No such file or
directory)
[2021-10-21 04:55:42,650] {hive.py:259} INFO -
airflow.ctx.execution_date=2021-10-17T15:00:00+00:00 (No such file or directory)
[2021-10-21 04:55:42,650] {hive.py:259} INFO - -hiveconf (No such file or
directory)
[2021-10-21 04:55:42,650] {hive.py:259} INFO -
airflow.ctx.dag_run_id=scheduled__2021-10-17T15:00:00+00:00 (No such file or
directory)
[2021-10-21 04:55:42,650] {hive.py:259} INFO - -hiveconf (No such file or
directory)
[2021-10-21 04:55:42,650] {hive.py:259} INFO -
airflow.ctx.dag_owner=REDACTED (No such file or directory)
[2021-10-21 04:55:42,651] {hive.py:259} INFO - -hiveconf (No such file or
directory)
[2021-10-21 04:55:42,651] {hive.py:259} INFO -
airflow.ctx.dag_email=REDACTED (No such file or directory)
[2021-10-21 04:55:42,759] {hive.py:259} INFO - Error: Error while compiling
statement: FAILED: SemanticException Line 1:23 Invalid path
''/tmp/tmphdv9t5yh'': No files matching path file:/tmp/tmphdv9t5yh
(state=42000,code=40000)
[2021-10-21 04:55:42,783] {logging_mixin.py:109} INFO - hive failed
[2021-10-21 04:56:42,841] {taskinstance.py:1463} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line
1165, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File
"/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line
1283, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File
"/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line
1313, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/airflow/dags/operators/custom_mysql_to_hive.py", line
193, in execute
raise Exception("still broken")
Exception: still broken
[2021-10-21 04:56:42,851] {taskinstance.py:1513} INFO - Marking task as
FAILED. dag_id=REDACTED, task_id=REDACTED, execution_date=20211017T150000,
start_date=20211021T045541, end_date=20211021T045642
[2021-10-21 04:56:42,927] {local_task_job.py:151} INFO - Task exited with
return code 1
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]