Repository: incubator-airflow Updated Branches: refs/heads/master 0dd00291d -> 751e936ac
[AIRFLOW-1393][[AIRFLOW-1393] Enable Py3 tests in contrib/spark_submit_hook[ The unit tests in `tests/contrib/hooks/test_spark_submit_hook.py` were skiped if run in Python3 because some test cases loop forever due to a mismatch/misunderstanding about bytes vs string that didn't matter under Py2 (i.e. the mocked data for subprocess.Popen was returning a String, but the actual Popen call would return bytes.) The fix is to use bytes and `six.ByteIO` so that the tests work on Py2 and Py3. Alsowe had to patch `subprocess.Popen` in the right place so the mocks are picked up. Closes #2427 from ashb/enable- spark_submit_hook_tests-py3 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/751e936a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/751e936a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/751e936a Branch: refs/heads/master Commit: 751e936ac2d18aec0315d2ed4f307c6b04ea431e Parents: 0dd0029 Author: Ash Berlin-Taylor <ash_git...@firemirror.com> Authored: Mon Jul 17 09:14:49 2017 +0200 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Mon Jul 17 09:14:49 2017 +0200 ---------------------------------------------------------------------- airflow/contrib/hooks/spark_submit_hook.py | 6 ++++-- tests/contrib/hooks/test_spark_submit_hook.py | 20 ++++++++------------ 2 files changed, 12 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/751e936a/airflow/contrib/hooks/spark_submit_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index 14e297b..a667753 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -212,9 +212,11 @@ class SparkSubmitHook(BaseHook): self._sp = subprocess.Popen(spark_submit_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + bufsize=-1, + universal_newlines=True, **kwargs) - self._process_log(iter(self._sp.stdout.readline, b'')) + self._process_log(iter(self._sp.stdout.readline, '')) returncode = self._sp.wait() if returncode: @@ -232,7 +234,7 @@ class SparkSubmitHook(BaseHook): """ # Consume the iterator for line in itr: - line = line.decode('utf-8').strip() + line = line.strip() # If we run yarn cluster mode, we want to extract the application id from # the logs so we can kill the application when we stop it unexpectedly if self._is_yarn and self._connection['deploy_mode'] == 'cluster': http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/751e936a/tests/contrib/hooks/test_spark_submit_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_spark_submit_hook.py b/tests/contrib/hooks/test_spark_submit_hook.py index 6b7da75..826576f 100644 --- a/tests/contrib/hooks/test_spark_submit_hook.py +++ b/tests/contrib/hooks/test_spark_submit_hook.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import six import sys import unittest -from io import StringIO from airflow import configuration, models from airflow.utils import db @@ -63,10 +63,6 @@ class TestSparkSubmitHook(unittest.TestCase): def setUp(self): - if sys.version_info[0] == 3: - raise unittest.SkipTest('TestSparkSubmitHook won\'t work with ' - 'python3. No need to test anything here') - configuration.load_test_config() db.merge_conn( models.Connection( @@ -135,11 +131,11 @@ class TestSparkSubmitHook(unittest.TestCase): ] self.assertEquals(expected_build_cmd, cmd) - @patch('subprocess.Popen') + @patch('airflow.contrib.hooks.spark_submit_hook.subprocess.Popen') def test_spark_process_runcmd(self, mock_popen): # Given - mock_popen.return_value.stdout = StringIO(u'stdout') - mock_popen.return_value.stderr = StringIO(u'stderr') + mock_popen.return_value.stdout = six.StringIO('stdout') + mock_popen.return_value.stderr = six.StringIO('stderr') mock_popen.return_value.wait.return_value = 0 # When @@ -147,7 +143,7 @@ class TestSparkSubmitHook(unittest.TestCase): hook.submit() # Then - self.assertEqual(mock_popen.mock_calls[0], call(['spark-submit', '--master', 'yarn', '--name', 'default-name', ''], stdout=-1, stderr=-2)) + self.assertEqual(mock_popen.mock_calls[0], call(['spark-submit', '--master', 'yarn', '--name', 'default-name', ''], stderr=-2, stdout=-1, universal_newlines=True, bufsize=-1)) def test_resolve_connection_yarn_default(self): # Given @@ -309,11 +305,11 @@ class TestSparkSubmitHook(unittest.TestCase): self.assertEqual(hook._yarn_application_id, 'application_1486558679801_1820') - @patch('subprocess.Popen') + @patch('airflow.contrib.hooks.spark_submit_hook.subprocess.Popen') def test_spark_process_on_kill(self, mock_popen): # Given - mock_popen.return_value.stdout = StringIO(u'stdout') - mock_popen.return_value.stderr = StringIO(u'stderr') + mock_popen.return_value.stdout = six.StringIO('stdout') + mock_popen.return_value.stderr = six.StringIO('stderr') mock_popen.return_value.poll.return_value = None mock_popen.return_value.wait.return_value = 0 log_lines = [