Repository: incubator-airflow Updated Branches: refs/heads/master 8d501b0ce -> 717a4aeee
[AIRFLOW-216] Add Sqoop Hook and Operator This patch adds a Sqoop hook and operator that implements Sqoop import. The hook is a wrapper around the sqoop 1 binary. * Closes #1576 from jwi078/sqoop_operator Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/717a4aee Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/717a4aee Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/717a4aee Branch: refs/heads/master Commit: 717a4aeee3bd0883067c82a3174ad8e2b7dbe8ec Parents: 8d501b0 Author: JohanW <johan...@gmail.com> Authored: Tue Jun 14 13:14:55 2016 +0200 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Tue Jun 14 13:14:59 2016 +0200 ---------------------------------------------------------------------- airflow/contrib/hooks/sqoop_hook.py | 223 +++++++++++++++++++++++ airflow/contrib/operators/sqoop_operator.py | 91 +++++++++ 2 files changed, 314 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/717a4aee/airflow/contrib/hooks/sqoop_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py new file mode 100644 index 0000000..f213fc4 --- /dev/null +++ b/airflow/contrib/hooks/sqoop_hook.py @@ -0,0 +1,223 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +This module contains a sqoop 1 hook +""" + +from airflow.hooks.base_hook import BaseHook +from airflow.exceptions import AirflowException + +import logging +import subprocess + +log = logging.getLogger(__name__) + + +class SqoopHook(BaseHook): + """ + This Hook is a wrapper around the sqoop 1 binary. To be able to use te hook + it is required that "sqoop" is in the PATH. + :param hive_home: (from json) The location of hive-site.xml + :type hive_home: str + :param job_tracker: (from json) <local|jobtracker:port> specify a job tracker + :type job_tracker: str + :param namenode: (from json) specify a namenode + :type namenode: str + :param lib_jars: (from json) specify comma separated jar files to + include in the classpath. + :type lib_jars: str + :param files: (from json) specify comma separated files to be copied + to the map reduce cluster + :type files: (from json) str + :param archives: (from json) specify comma separated archives to be + unarchived on the compute machines. + :type archives: str + """ + def __init__(self, conn_id='sqoop_default'): + conn = self.get_connection(conn_id) + self.hive_home = conn.extra_dejson.get('hive_home', None) + self.job_tracker = conn.extra_dejson.get('job_tracker', None) + self.namenode = conn.extra_dejson.get('namenode', None) + self.lib_jars = conn.extra_dejson.get('libjars', None) + self.files = conn.extra_dejson.get('files', None) + self.archives = conn.extra_dejson.get('archives', None) + self.conn = conn + + def get_conn(self): + pass + + def Popen(self, cmd, export=False, **kwargs): + """ + Remote Popen + + :param cmd: command to remotely execute + :param kwargs: extra arguments to Popen (see subprocess.Popen) + :return: handle to subprocess + """ + prefixed_cmd = self._prepare_command(cmd, export=export) + print prefixed_cmd + return subprocess.Popen(prefixed_cmd, **kwargs) + + def _prepare_command(self, cmd, export=False): + + connection_cmd = "" + + if export: + connection_cmd = ["sqoop", "export", "--verbose"] + else: + connection_cmd = ["sqoop", "import", "--verbose"] + + if self.job_tracker: + connection_cmd += ["-jt", self.job_tracker] + if self.conn.login: + connection_cmd += ["--username", self.conn.login] + # todo: put this in a password file + if self.conn.password: + connection_cmd += ["--password", self.conn.password] + if self.lib_jars: + connection_cmd += ["-libjars", self.lib_jars] + if self.files: + connection_cmd += ["-files", self.files] + if self.namenode: + connection_cmd += ["-fs", self.namenode] + if self.archives: + connection_cmd += ["-archives", self.archives] + + connection_cmd += ["--connect", "{}:{}/{}".format(self.conn.host, self.conn.port, self.conn.schema)] + connection_cmd += cmd + + return connection_cmd + + def _import_cmd(self, target_dir, + append=False, type="text", + num_mappers=None, split_by=None): + + cmd = ["--target-dir", target_dir] + + if not num_mappers: + num_mappers = 1 + + cmd += ["--num-mappers", str(num_mappers)] + + if split_by: + cmd += ["--split-by", split_by] + + if append: + cmd += ["--append"] + + if type == "avro": + cmd += ["--as-avrodatafile"] + elif type == "sequence": + cmd += ["--as-sequencefile"] + else: + cmd += ["--as-textfile"] + + return cmd + + def import_table(self, table, target_dir, + append=False, type="text", columns=None, + num_mappers=None, split_by=None, where=None): + """ + Imports table from remote location to target dir. Arguments are + copies of direct sqoop command line arguments + :param table: Table to read + :param target_dir: HDFS destination dir + :param append: Append data to an existing dataset in HDFS + :param type: "avro", "sequence", "text" Imports data to into the specified + format. Defaults to text. + :param columns: <col,col,colâ¦> Columns to import from table + :param num_mappers: Use n map tasks to import in parallel + :param split_by: Column of the table used to split work units + :param where: WHERE clause to use during import + """ + cmd = self._import_cmd(target_dir, append, type, + num_mappers, split_by) + cmd += ["--table", table] + if columns: + cmd += ["--columns", columns] + if where: + cmd += ["--where", where] + + p = self.Popen(cmd, export=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + output, stderr = p.communicate() + + if p.returncode != 0: + # I like this better: RemoteCalledProcessError(p.returncode, cmd, self.host, output=output) + raise AirflowException("Cannot execute {} on {}. Error code is: " + "{}. Output: {}, Stderr: {}" + .format(cmd, self.conn.host, + p.returncode, output, stderr)) + + def _export_cmd(self, export_dir, num_mappers=None): + + cmd = ["--export-dir", export_dir] + + if not num_mappers: + num_mappers = 1 + + cmd += ["--num-mappers", str(num_mappers)] + + return cmd + + def export_table(self, table, export_dir, + num_mappers=None): + """ + Exports Hive table to remote location. Arguments are copies of direct + sqoop command line Arguments + :param table: Table remote destination + :param export_dir: Hive table to export + :param num_mappers: Use n map tasks to import in parallel + """ + + cmd = self._export_cmd(export_dir, num_mappers) + cmd += ["--table", table] + + p = self.Popen(cmd, export=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + output, stderr = p.communicate() + + if p.returncode != 0: + # I like this better: RemoteCalledProcessError(p.returncode, cmd, self.host, output=output) + raise AirflowException("Cannot execute {} on {}. Error code is: " + "{}. Output: {}, Stderr: {}" + .format(cmd, self.conn.host, + p.returncode, output, stderr)) + + def import_query(self, query, target_dir, + append=False, type="text", + num_mappers=None, split_by=None): + """ + + :param query: Free format query to run + :param target_dir: HDFS destination dir + :param append: Append data to an existing dataset in HDFS + :param type: "avro", "sequence", "text" Imports data to into the specified + format. Defaults to text. + :param num_mappers: Use n map tasks to import in parallel + :param split_by: Column of the table used to split work units + """ + cmd = self._import_cmd(target_dir, append, type, + num_mappers, split_by) + cmd += ["--query", query] + + p = self.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + output, stderr = p.communicate() + + if p.returncode != 0: + # I like this better: RemoteCalledProcessError(p.returncode, cmd, self.host, output=output) + raise AirflowException("Cannot execute {} on {}. Error code is: " + "{}. Output: {}, Stderr: {}" + .format(cmd, self.conn.host, + p.returncode, output, stderr)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/717a4aee/airflow/contrib/operators/sqoop_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/sqoop_operator.py b/airflow/contrib/operators/sqoop_operator.py new file mode 100644 index 0000000..8bf1c05 --- /dev/null +++ b/airflow/contrib/operators/sqoop_operator.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +This module contains a sqoop 1 operator +""" + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.contrib.hooks.sqoop_hook import SqoopHook + + +class SqoopOperator(BaseOperator): + """ + execute sqoop job + """ + @apply_defaults + def __init__(self, + conn_id='sqoop_default', + type_cmd='import', + table='', + target_dir=None, + append=None, + type=None, + columns=None, + num_mappers='1', + split_by=None, + where=None, + export_dir=None, + *args, + **kwargs): + """ + :param conn_id: str + :param type_cmd: str specify command to execute "export" or "import" + :param table: Table to read + :param target_dir: HDFS destination dir + :param append: Append data to an existing dataset in HDFS + :param type: "avro", "sequence", "text" Imports data to into the specified + format. Defaults to text. + :param columns: <col,col,col> Columns to import from table + :param num_mappers: U n map task to import/export in parallel + :param split_by: Column of the table used to split work units + :param where: WHERE clause to use during import + :param export_dir: HDFS Hive database directory to export + """ + super(SqoopOperator, self).__init__(*args, **kwargs) + self.conn_id = conn_id + self.type_cmd = type_cmd + self.table = table + self.target_dir = target_dir + self.append = append + self.type = type + self.columns = columns + self.num_mappers = num_mappers + self.split_by = split_by + self.where = where + self.export_dir = export_dir + + def execute(self, context): + """ + Execute sqoop job + """ + hook = SqoopHook(conn_id=self.conn_id) + + if self.type_cmd is 'export': + hook.export_table( + table=self.table, + export_dir=self.export_dir, + num_mappers=self.num_mappers) + else: + hook.import_table( + table=self.table, + target_dir=self.target_dir, + append=self.append, + type=self.type, + columns=self.columns, + num_mappers=self.num_mappers, + split_by=self.split_by, + where=self.where)