Repository: incubator-airflow
Updated Branches:
  refs/heads/master 8d501b0ce -> 717a4aeee


[AIRFLOW-216] Add Sqoop Hook and Operator

This patch adds a Sqoop hook and operator that implements Sqoop import.
The hook is a wrapper around the sqoop 1 binary.

* Closes #1576 from jwi078/sqoop_operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/717a4aee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/717a4aee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/717a4aee

Branch: refs/heads/master
Commit: 717a4aeee3bd0883067c82a3174ad8e2b7dbe8ec
Parents: 8d501b0
Author: JohanW <johan...@gmail.com>
Authored: Tue Jun 14 13:14:55 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jun 14 13:14:59 2016 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/sqoop_hook.py         | 223 +++++++++++++++++++++++
 airflow/contrib/operators/sqoop_operator.py |  91 +++++++++
 2 files changed, 314 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/717a4aee/airflow/contrib/hooks/sqoop_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/sqoop_hook.py 
b/airflow/contrib/hooks/sqoop_hook.py
new file mode 100644
index 0000000..f213fc4
--- /dev/null
+++ b/airflow/contrib/hooks/sqoop_hook.py
@@ -0,0 +1,223 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+This module contains a sqoop 1 hook
+"""
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.exceptions import AirflowException
+
+import logging
+import subprocess
+
+log = logging.getLogger(__name__)
+
+
+class SqoopHook(BaseHook):
+    """
+    This Hook is a wrapper around the sqoop 1 binary. To be able to use te hook
+    it is required that "sqoop" is in the PATH.
+    :param hive_home: (from json) The location of hive-site.xml
+    :type hive_home: str
+    :param job_tracker: (from json) <local|jobtracker:port> specify a job 
tracker
+    :type job_tracker: str
+    :param namenode: (from json) specify a namenode
+    :type namenode: str
+    :param lib_jars: (from json) specify comma separated jar files to
+        include in the classpath.
+    :type lib_jars: str
+    :param files: (from json) specify comma separated files to be copied
+        to the map reduce cluster
+    :type files: (from json) str
+    :param archives: (from json)  specify comma separated archives to be
+        unarchived on the compute machines.
+    :type archives: str
+    """
+    def __init__(self, conn_id='sqoop_default'):
+        conn = self.get_connection(conn_id)
+        self.hive_home = conn.extra_dejson.get('hive_home', None)
+        self.job_tracker = conn.extra_dejson.get('job_tracker', None)
+        self.namenode = conn.extra_dejson.get('namenode', None)
+        self.lib_jars = conn.extra_dejson.get('libjars', None)
+        self.files = conn.extra_dejson.get('files', None)
+        self.archives = conn.extra_dejson.get('archives', None)
+        self.conn = conn
+
+    def get_conn(self):
+        pass
+
+    def Popen(self, cmd, export=False, **kwargs):
+        """
+        Remote Popen
+
+        :param cmd: command to remotely execute
+        :param kwargs: extra arguments to Popen (see subprocess.Popen)
+        :return: handle to subprocess
+        """
+        prefixed_cmd = self._prepare_command(cmd, export=export)
+        print prefixed_cmd
+        return subprocess.Popen(prefixed_cmd, **kwargs)
+
+    def _prepare_command(self, cmd, export=False):
+
+        connection_cmd = ""
+
+        if export:
+            connection_cmd = ["sqoop", "export", "--verbose"]
+        else:
+            connection_cmd = ["sqoop", "import", "--verbose"]
+
+        if self.job_tracker:
+            connection_cmd += ["-jt", self.job_tracker]
+        if self.conn.login:
+            connection_cmd += ["--username", self.conn.login]
+        # todo: put this in a password file
+        if self.conn.password:
+            connection_cmd += ["--password", self.conn.password]
+        if self.lib_jars:
+            connection_cmd += ["-libjars", self.lib_jars]
+        if self.files:
+            connection_cmd += ["-files", self.files]
+        if self.namenode:
+            connection_cmd += ["-fs", self.namenode]
+        if self.archives:
+            connection_cmd += ["-archives", self.archives]
+
+        connection_cmd += ["--connect", "{}:{}/{}".format(self.conn.host, 
self.conn.port, self.conn.schema)]
+        connection_cmd += cmd
+
+        return connection_cmd
+
+    def _import_cmd(self, target_dir,
+                    append=False, type="text",
+                    num_mappers=None, split_by=None):
+
+        cmd = ["--target-dir", target_dir]
+
+        if not num_mappers:
+            num_mappers = 1
+
+        cmd += ["--num-mappers", str(num_mappers)]
+
+        if split_by:
+            cmd += ["--split-by", split_by]
+
+        if append:
+            cmd += ["--append"]
+
+        if type == "avro":
+            cmd += ["--as-avrodatafile"]
+        elif type == "sequence":
+            cmd += ["--as-sequencefile"]
+        else:
+            cmd += ["--as-textfile"]
+
+        return cmd
+
+    def import_table(self, table, target_dir,
+                     append=False, type="text", columns=None,
+                     num_mappers=None, split_by=None, where=None):
+        """
+        Imports table from remote location to target dir. Arguments are
+        copies of direct sqoop command line arguments
+        :param table: Table to read
+        :param target_dir: HDFS destination dir
+        :param append: Append data to an existing dataset in HDFS
+        :param type: "avro", "sequence", "text" Imports data to into the 
specified
+            format. Defaults to text.
+        :param columns: <col,col,col…> Columns to import from table
+        :param num_mappers: Use n map tasks to import in parallel
+        :param split_by: Column of the table used to split work units
+        :param where: WHERE clause to use during import
+        """
+        cmd = self._import_cmd(target_dir, append, type,
+                               num_mappers, split_by)
+        cmd += ["--table", table]
+        if columns:
+            cmd += ["--columns", columns]
+        if where:
+            cmd += ["--where", where]
+
+        p = self.Popen(cmd, export=False, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE)
+        output, stderr = p.communicate()
+
+        if p.returncode != 0:
+            # I like this better: RemoteCalledProcessError(p.returncode, cmd, 
self.host, output=output)
+            raise AirflowException("Cannot execute {} on {}. Error code is: "
+                                   "{}. Output: {}, Stderr: {}"
+                                   .format(cmd, self.conn.host,
+                                           p.returncode, output, stderr))
+
+    def _export_cmd(self, export_dir, num_mappers=None):
+
+        cmd = ["--export-dir", export_dir]
+
+        if not num_mappers:
+            num_mappers = 1
+
+        cmd += ["--num-mappers", str(num_mappers)]
+
+        return cmd
+
+    def export_table(self, table, export_dir,
+                     num_mappers=None):
+        """
+        Exports Hive table to remote location. Arguments are copies of direct
+        sqoop command line Arguments
+        :param table: Table remote destination
+        :param export_dir: Hive table to export
+        :param num_mappers: Use n map tasks to import in parallel
+        """
+
+        cmd = self._export_cmd(export_dir, num_mappers)
+        cmd += ["--table", table]
+
+        p = self.Popen(cmd, export=True, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE)
+        output, stderr = p.communicate()
+
+        if p.returncode != 0:
+            # I like this better: RemoteCalledProcessError(p.returncode, cmd, 
self.host, output=output)
+            raise AirflowException("Cannot execute {} on {}. Error code is: "
+                                   "{}. Output: {}, Stderr: {}"
+                                   .format(cmd, self.conn.host,
+                                           p.returncode, output, stderr))
+
+    def import_query(self, query, target_dir,
+                     append=False, type="text",
+                     num_mappers=None, split_by=None):
+        """
+
+        :param query: Free format query to run
+        :param target_dir: HDFS destination dir
+        :param append: Append data to an existing dataset in HDFS
+        :param type: "avro", "sequence", "text" Imports data to into the 
specified
+            format. Defaults to text.
+        :param num_mappers: Use n map tasks to import in parallel
+        :param split_by: Column of the table used to split work units
+        """
+        cmd = self._import_cmd(target_dir, append, type,
+                               num_mappers, split_by)
+        cmd += ["--query", query]
+
+        p = self.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        output, stderr = p.communicate()
+
+        if p.returncode != 0:
+            # I like this better: RemoteCalledProcessError(p.returncode, cmd, 
self.host, output=output)
+            raise AirflowException("Cannot execute {} on {}. Error code is: "
+                                   "{}. Output: {}, Stderr: {}"
+                                   .format(cmd, self.conn.host,
+                                           p.returncode, output, stderr))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/717a4aee/airflow/contrib/operators/sqoop_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sqoop_operator.py 
b/airflow/contrib/operators/sqoop_operator.py
new file mode 100644
index 0000000..8bf1c05
--- /dev/null
+++ b/airflow/contrib/operators/sqoop_operator.py
@@ -0,0 +1,91 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+This module contains a sqoop 1 operator
+"""
+
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.contrib.hooks.sqoop_hook import SqoopHook
+
+
+class SqoopOperator(BaseOperator):
+    """
+    execute sqoop job
+    """
+    @apply_defaults
+    def __init__(self,
+                 conn_id='sqoop_default',
+                 type_cmd='import',
+                 table='',
+                 target_dir=None,
+                 append=None,
+                 type=None,
+                 columns=None,
+                 num_mappers='1',
+                 split_by=None,
+                 where=None,
+                 export_dir=None,
+                 *args,
+                 **kwargs):
+        """
+        :param conn_id: str
+       :param type_cmd: str specify command to execute "export" or "import"
+        :param table: Table to read
+        :param target_dir: HDFS destination dir
+        :param append: Append data to an existing dataset in HDFS
+        :param type: "avro", "sequence", "text" Imports data to into the 
specified
+           format. Defaults to text.
+        :param columns: <col,col,col> Columns to import from table
+        :param num_mappers: U n map task to import/export in parallel
+        :param split_by: Column of the table used to split work units
+        :param where: WHERE clause to use during import
+        :param export_dir: HDFS Hive database directory to export
+        """
+        super(SqoopOperator, self).__init__(*args, **kwargs)
+        self.conn_id = conn_id
+        self.type_cmd = type_cmd
+        self.table = table
+        self.target_dir = target_dir
+        self.append = append
+        self.type = type
+        self.columns = columns
+        self.num_mappers = num_mappers
+        self.split_by = split_by
+        self.where = where
+        self.export_dir = export_dir
+
+    def execute(self, context):
+        """
+        Execute sqoop job
+        """
+        hook = SqoopHook(conn_id=self.conn_id)
+
+        if self.type_cmd is 'export':
+            hook.export_table(
+                table=self.table,
+                export_dir=self.export_dir,
+                num_mappers=self.num_mappers)
+        else:
+            hook.import_table(
+                table=self.table,
+                target_dir=self.target_dir,
+                append=self.append,
+                type=self.type,
+                columns=self.columns,
+                num_mappers=self.num_mappers,
+                split_by=self.split_by,
+                where=self.where)

Reply via email to