Repository: incubator-airflow Updated Branches: refs/heads/master eb994d683 -> 147472b99
[AIRFLOW-1995][Airflow 1995] add on_kill method to SqoopOperator Closes #2936 from Acehaidrey/AIRFLOW-1995 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/147472b9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/147472b9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/147472b9 Branch: refs/heads/master Commit: 147472b99babdbc67e63f784968864703e168562 Parents: eb994d6 Author: Ace Haidrey <[email protected]> Authored: Fri Jan 12 15:08:29 2018 +0100 Committer: Fokko Driesprong <[email protected]> Committed: Fri Jan 12 15:08:29 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/hooks/sqoop_hook.py | 15 ++++++----- airflow/contrib/operators/sqoop_operator.py | 34 +++++++++++++++--------- 2 files changed, 30 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/147472b9/airflow/contrib/hooks/sqoop_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py index 578b527..102b599 100644 --- a/airflow/contrib/hooks/sqoop_hook.py +++ b/airflow/contrib/hooks/sqoop_hook.py @@ -88,21 +88,22 @@ class SqoopHook(BaseHook, LoggingMixin): :param kwargs: extra arguments to Popen (see subprocess.Popen) :return: handle to subprocess """ - self.log.info("Executing command: {}".format(' '.join(self.cmd_mask_password(cmd)))) - sp = subprocess.Popen(cmd, + masked_cmd = ' '.join(self.cmd_mask_password(cmd)) + self.log.info("Executing command: {}".format(masked_cmd)) + self.sp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs) - for line in iter(sp.stdout): + for line in iter(self.sp.stdout): self.log.info(line.strip()) - sp.wait() + self.sp.wait() - self.log.info("Command exited with return code %s", sp.returncode) + self.log.info("Command exited with return code %s", self.sp.returncode) - if sp.returncode: - raise AirflowException("Sqoop command failed: {}".format(' '.join(self.cmd_mask_password(cmd)))) + if self.sp.returncode: + raise AirflowException("Sqoop command failed: {}".format(masked_cmd)) def _prepare_command(self, export=False): sqoop_cmd_type = "export" if export else "import" http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/147472b9/airflow/contrib/operators/sqoop_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/sqoop_operator.py b/airflow/contrib/operators/sqoop_operator.py index cdaf336..cc0d646 100644 --- a/airflow/contrib/operators/sqoop_operator.py +++ b/airflow/contrib/operators/sqoop_operator.py @@ -16,6 +16,8 @@ """ This module contains a sqoop 1 operator """ +import os +import signal from airflow.contrib.hooks.sqoop_hook import SqoopHook from airflow.exceptions import AirflowException @@ -148,22 +150,24 @@ class SqoopOperator(BaseOperator): self.hcatalog_table = hcatalog_table self.create_hcatalog_table = create_hcatalog_table self.properties = properties - self.extra_import_options = extra_import_options - self.extra_export_options = extra_export_options + self.extra_import_options = extra_import_options or {} + self.extra_export_options = extra_export_options or {} def execute(self, context): """ Execute sqoop job """ - hook = SqoopHook(conn_id=self.conn_id, - verbose=self.verbose, - num_mappers=self.num_mappers, - hcatalog_database=self.hcatalog_database, - hcatalog_table=self.hcatalog_table, - properties=self.properties) + self.hook = SqoopHook( + conn_id=self.conn_id, + verbose=self.verbose, + num_mappers=self.num_mappers, + hcatalog_database=self.hcatalog_database, + hcatalog_table=self.hcatalog_table, + properties=self.properties + ) if self.cmd_type == 'export': - hook.export_table( + self.hook.export_table( table=self.table, export_dir=self.export_dir, input_null_string=self.input_null_string, @@ -185,10 +189,12 @@ class SqoopOperator(BaseOperator): self.extra_import_options['create-hcatalog-table'] = '' if self.table and self.query: - raise AirflowException('Cannot specify query and table together. Need to specify either or.') + raise AirflowException( + 'Cannot specify query and table together. Need to specify either or.' + ) if self.table: - hook.import_table( + self.hook.import_table( table=self.table, target_dir=self.target_dir, append=self.append, @@ -200,7 +206,7 @@ class SqoopOperator(BaseOperator): driver=self.driver, extra_import_options=self.extra_import_options) elif self.query: - hook.import_query( + self.hook.import_query( query=self.query, target_dir=self.target_dir, append=self.append, @@ -215,3 +221,7 @@ class SqoopOperator(BaseOperator): ) else: raise AirflowException("cmd_type should be 'import' or 'export'") + + def on_kill(self): + self.log.info('Sending SIGTERM signal to bash process group') + os.killpg(os.getpgid(self.hook.sp.pid), signal.SIGTERM)
