Repository: incubator-airflow
Updated Branches:
  refs/heads/master e48b8e36a -> 67b351183


[AIRFLOW-2448] Enhance HiveCliHook.load_df to work with datetime

HiveCliHook.load_df can not handle DataFrame
which contains datetime for now.
This PR enhances it to work with datetime,
fixes some bug introduced by AIRFLOW-2441,
and addresses some flake8 issues.

Closes #3364 from sekikn/AIRFLOW-2448


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/67b35118
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/67b35118
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/67b35118

Branch: refs/heads/master
Commit: 67b351183b0f85e9484f1f7f70e0b46300753b60
Parents: e48b8e3
Author: Kengo Seki <sek...@apache.org>
Authored: Sat May 19 16:59:44 2018 +0100
Committer: Kaxil Naik <kaxiln...@apache.org>
Committed: Sat May 19 17:00:47 2018 +0100

----------------------------------------------------------------------
 airflow/hooks/hive_hooks.py   | 30 +++++++++++++++----------
 tests/hooks/test_hive_hook.py | 45 +++++++++++++++++++++++++++++++++++++-
 2 files changed, 62 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67b35118/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 92958c0..01a6a2a 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -20,11 +20,12 @@
 
 from __future__ import print_function, unicode_literals
 from six.moves import zip
-from past.builtins import basestring
+from past.builtins import basestring, unicode
 
 import unicodecsv as csv
 import itertools
 import re
+import six
 import subprocess
 import time
 from collections import OrderedDict
@@ -316,15 +317,16 @@ class HiveCliHook(BaseHook):
 
         def _infer_field_types_from_df(df):
             DTYPE_KIND_HIVE_TYPE = {
-                'b': 'BOOLEAN',  # boolean
-                'i': 'BIGINT',   # signed integer
-                'u': 'BIGINT',   # unsigned integer
-                'f': 'DOUBLE',   # floating-point
-                'c': 'STRING',   # complex floating-point
-                'O': 'STRING',   # object
-                'S': 'STRING',   # (byte-)string
-                'U': 'STRING',   # Unicode
-                'V': 'STRING'    # void
+                'b': 'BOOLEAN',    # boolean
+                'i': 'BIGINT',     # signed integer
+                'u': 'BIGINT',     # unsigned integer
+                'f': 'DOUBLE',     # floating-point
+                'c': 'STRING',     # complex floating-point
+                'M': 'TIMESTAMP',  # datetime
+                'O': 'STRING',     # object
+                'S': 'STRING',     # (byte-)string
+                'U': 'STRING',     # Unicode
+                'V': 'STRING'      # void
             }
 
             d = OrderedDict()
@@ -336,15 +338,19 @@ class HiveCliHook(BaseHook):
             pandas_kwargs = {}
 
         with TemporaryDirectory(prefix='airflow_hiveop_') as tmp_dir:
-            with NamedTemporaryFile(dir=tmp_dir) as f:
+            with NamedTemporaryFile(dir=tmp_dir, mode="w") as f:
 
                 if field_dict is None and (create or recreate):
                     field_dict = _infer_field_types_from_df(df)
 
                 df.to_csv(path_or_buf=f,
-                          sep=delimiter.encode(encoding),
+                          sep=(delimiter.encode(encoding)
+                               if six.PY2 and isinstance(delimiter, unicode)
+                               else delimiter),
                           header=False,
                           index=False,
+                          encoding=encoding,
+                          date_format="%Y-%m-%d %H:%M:%S",
                           **pandas_kwargs)
                 f.flush()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67b35118/tests/hooks/test_hive_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_hive_hook.py b/tests/hooks/test_hive_hook.py
index 3c278e3..eadb0f1 100644
--- a/tests/hooks/test_hive_hook.py
+++ b/tests/hooks/test_hive_hook.py
@@ -21,6 +21,7 @@
 import datetime
 import pandas as pd
 import random
+import re
 
 import mock
 import unittest
@@ -125,7 +126,7 @@ class TestHiveCliHook(unittest.TestCase):
         kwargs = mock_to_csv.call_args[1]
         self.assertEqual(kwargs["header"], False)
         self.assertEqual(kwargs["index"], False)
-        self.assertEqual(kwargs["sep"], delimiter.encode(encoding))
+        self.assertEqual(kwargs["sep"], delimiter)
 
         mock_load_file.assert_called_once()
         kwargs = mock_load_file.call_args[1]
@@ -134,6 +135,48 @@ class TestHiveCliHook(unittest.TestCase):
         self.assertTrue(isinstance(kwargs["field_dict"], OrderedDict))
         self.assertEqual(kwargs["table"], table)
 
+    @mock.patch('airflow.hooks.hive_hooks.HiveCliHook.run_cli')
+    def test_load_df_with_data_types(self, mock_run_cli):
+        d = OrderedDict()
+        d['b'] = [True]
+        d['i'] = [-1]
+        d['t'] = [1]
+        d['f'] = [0.0]
+        d['c'] = ['c']
+        d['M'] = [datetime.datetime(2018, 1, 1)]
+        d['O'] = [object()]
+        d['S'] = ['STRING'.encode('utf-8')]
+        d['U'] = ['STRING']
+        d['V'] = [None]
+        df = pd.DataFrame(d)
+
+        hook = HiveCliHook()
+        hook.load_df(df, 't')
+
+        query = """
+            CREATE TABLE IF NOT EXISTS t (
+                b BOOLEAN,
+                i BIGINT,
+                t BIGINT,
+                f DOUBLE,
+                c STRING,
+                M TIMESTAMP,
+                O STRING,
+                S STRING,
+                U STRING,
+                V STRING)
+            ROW FORMAT DELIMITED
+            FIELDS TERMINATED BY ','
+            STORED AS textfile
+            ;
+        """
+
+        def _trim(s):
+            return re.sub("\s+", " ", s.strip())
+
+        self.assertEqual(_trim(mock_run_cli.call_args_list[0][0][0]),
+                         _trim(query))
+
 
 class TestHiveMetastoreHook(HiveEnvironmentTest):
     VALID_FILTER_MAP = {'key2': 'value2'}

Reply via email to