Repository: incubator-airflow
Updated Branches:
  refs/heads/master fd381a11e -> 4f459b64e


[AIRFLOW-1217] Enable Sqoop logging

The output of the subprocess was not redirected to
airflow, now the
sqoop output gets written to Airflow and captured
in the logs.
Extended the tests of the sqoop popen operation.

Closes #2307 from Fokko/AIRFLOW-1217-enable-sqoop-
logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4f459b64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4f459b64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4f459b64

Branch: refs/heads/master
Commit: 4f459b64e098f4fd73af23030e7992e0465bfe21
Parents: fd381a1
Author: Fokko Driesprong <[email protected]>
Authored: Sun May 21 20:30:25 2017 +0200
Committer: Bolke de Bruin <[email protected]>
Committed: Sun May 21 20:30:25 2017 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/sqoop_hook.py    | 39 +++++++++++-------
 tests/contrib/hooks/test_sqoop_hook.py | 63 ++++++++++++++++++++++++-----
 2 files changed, 78 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4f459b64/airflow/contrib/hooks/sqoop_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/sqoop_hook.py 
b/airflow/contrib/hooks/sqoop_hook.py
index 2dd33ae..7fbb6c5 100644
--- a/airflow/contrib/hooks/sqoop_hook.py
+++ b/airflow/contrib/hooks/sqoop_hook.py
@@ -68,12 +68,20 @@ class SqoopHook(BaseHook):
         self.hcatalog_database = hcatalog_database
         self.hcatalog_table = hcatalog_table
         self.verbose = verbose
-        self.num_mappers = str(num_mappers)
+        self.num_mappers = num_mappers
         self.properties = properties
 
     def get_conn(self):
         pass
 
+    def cmd_mask_password(self, cmd):
+        try:
+            password_index = cmd.index('--password')
+            cmd[password_index + 1] = 'MASKED'
+        except ValueError:
+            logging.debug("No password in sqoop cmd")
+        return cmd
+
     def Popen(self, cmd, **kwargs):
         """
         Remote Popen
@@ -82,18 +90,21 @@ class SqoopHook(BaseHook):
         :param kwargs: extra arguments to Popen (see subprocess.Popen)
         :return: handle to subprocess
         """
-        process = subprocess.Popen(cmd,
-                                   stdout=subprocess.PIPE,
-                                   stderr=subprocess.PIPE,
-                                   **kwargs)
-        output, stderr = process.communicate()
-
-        if process.returncode != 0:
-            raise AirflowException(
-                "Cannot execute {} on {}. Error code is: {} Output: {}, "
-                "Stderr: {}".format(cmd, self.conn.host, process.returncode,
-                                    output, stderr)
-            )
+        logging.info("Executing command: {}".format(' '.join(cmd)))
+        sp = subprocess.Popen(cmd,
+                              stdout=subprocess.PIPE,
+                              stderr=subprocess.STDOUT,
+                              **kwargs)
+
+        for line in iter(sp.stdout):
+            logging.info(line.strip())
+
+        sp.wait()
+
+        logging.info("Command exited with return code 
{0}".format(sp.returncode))
+
+        if sp.returncode:
+            raise AirflowException("Sqoop command failed: {}".format(' 
'.join(cmd)))
 
     def _prepare_command(self, export=False):
         if export:
@@ -120,7 +131,7 @@ class SqoopHook(BaseHook):
         if self.archives:
             connection_cmd += ["-archives", self.archives]
         if self.num_mappers:
-            connection_cmd += ["--num-mappers", self.num_mappers]
+            connection_cmd += ["--num-mappers", str(self.num_mappers)]
         if self.hcatalog_database:
             connection_cmd += ["--hcatalog-database", self.hcatalog_database]
         if self.hcatalog_table:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4f459b64/tests/contrib/hooks/test_sqoop_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_sqoop_hook.py 
b/tests/contrib/hooks/test_sqoop_hook.py
index 94fcada..7c934b9 100644
--- a/tests/contrib/hooks/test_sqoop_hook.py
+++ b/tests/contrib/hooks/test_sqoop_hook.py
@@ -21,6 +21,10 @@ from airflow.contrib.hooks.sqoop_hook import SqoopHook
 from airflow.exceptions import AirflowException
 from airflow.utils import db
 
+from mock import patch, call
+
+from io import StringIO
+
 
 class TestSqoopHook(unittest.TestCase):
     _config = {
@@ -69,20 +73,46 @@ class TestSqoopHook(unittest.TestCase):
         configuration.load_test_config()
         db.merge_conn(
             models.Connection(
-                conn_id='sqoop_test', conn_type='sqoop',
+                conn_id='sqoop_test', conn_type='sqoop', schema='schema',
                 host='rmdbs', port=5050, extra=json.dumps(self._config_json)
             )
         )
 
-    def test_popen(self):
-        hook = SqoopHook(**self._config)
-
-        # Should go well
-        hook.Popen(['ls'])
-
-        # Should give an exception
-        with self.assertRaises(OSError):
-            hook.Popen('exit 1')
+    @patch('subprocess.Popen')
+    def test_popen(self, mock_popen):
+        # Given
+        mock_popen.return_value.stdout = StringIO(u'stdout')
+        mock_popen.return_value.stderr = StringIO(u'stderr')
+        mock_popen.return_value.returncode = 0
+        mock_popen.return_value.communicate.return_value = 
[StringIO(u'stdout\nstdout'), StringIO(u'stderr\nstderr')]
+
+        # When
+        hook = SqoopHook(conn_id='sqoop_test')
+        hook.export_table(**self._config_export)
+
+        # Then
+        self.assertEqual(mock_popen.mock_calls[0], call(
+            ['sqoop',
+             'export',
+             '-jt', self._config_json['job_tracker'],
+             '-libjars', self._config_json['libjars'],
+             '-files', self._config_json['files'],
+             '-fs', self._config_json['namenode'],
+             '-archives', self._config_json['archives'],
+             '--connect', 'rmdbs:5050/schema',
+             '--input-null-string', self._config_export['input_null_string'],
+             '--input-null-non-string', 
self._config_export['input_null_non_string'],
+             '--staging-table', self._config_export['staging_table'],
+             '--clear-staging-table',
+             '--enclosed-by', self._config_export['enclosed_by'],
+             '--escaped-by', self._config_export['escaped_by'],
+             '--input-fields-terminated-by', 
self._config_export['input_fields_terminated_by'],
+             '--input-lines-terminated-by', 
self._config_export['input_lines_terminated_by'],
+             '--input-optionally-enclosed-by', 
self._config_export['input_optionally_enclosed_by'],
+             '--batch',
+             '--relaxed-isolation',
+             '--export-dir', self._config_export['export_dir'],
+             '--table', self._config_export['table']], stderr=-2, stdout=-1))
 
     def test_submit(self):
         hook = SqoopHook(**self._config)
@@ -228,6 +258,19 @@ class TestSqoopHook(unittest.TestCase):
         with self.assertRaises(AirflowException):
             hook._get_export_format_argument('unknown')
 
+    def test_cmd_mask_password(self):
+        hook = SqoopHook()
+        self.assertEqual(
+            hook.cmd_mask_password(['--password', 'supersecret']),
+            ['--password', 'MASKED']
+        )
+
+        cmd = ['--target', 'targettable']
+        self.assertEqual(
+            hook.cmd_mask_password(cmd),
+            cmd
+        )
+
 
 if __name__ == '__main__':
     unittest.main()

Reply via email to