Repository: incubator-airflow Updated Branches: refs/heads/master efdc4d3b4 -> 1d531555e
[AIRFLOW-1734][Airflow 1734] Sqoop hook/operator enhancements Closes #2703 from Acehaidrey/sqoop_contrib_fixes Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1d531555 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1d531555 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1d531555 Branch: refs/heads/master Commit: 1d531555ecd594ee7ec2c5d3fc87f8d4bcc2c27e Parents: efdc4d3 Author: Ace Haidrey <[email protected]> Authored: Sat Oct 28 15:07:56 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Sat Oct 28 15:07:56 2017 +0200 ---------------------------------------------------------------------- airflow/contrib/hooks/sqoop_hook.py | 91 ++++++++------ airflow/contrib/operators/sqoop_operator.py | 53 +++++++-- tests/contrib/hooks/test_sqoop_hook.py | 113 +++++++++++++----- tests/contrib/operators/test_sqoop_operator.py | 124 ++++++++++++++++++-- 4 files changed, 293 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1d531555/airflow/contrib/hooks/sqoop_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py index 5b00b15..c56fbcb 100644 --- a/airflow/contrib/hooks/sqoop_hook.py +++ b/airflow/contrib/hooks/sqoop_hook.py @@ -43,7 +43,7 @@ class SqoopHook(BaseHook, LoggingMixin): :param verbose: Set sqoop to verbose. :type verbose: bool :param num_mappers: Number of map tasks to import in parallel. - :type num_mappers: str + :type num_mappers: int :param properties: Properties to set via the -D argument :type properties: dict """ @@ -52,8 +52,6 @@ class SqoopHook(BaseHook, LoggingMixin): num_mappers=None, hcatalog_database=None, hcatalog_table=None, properties=None): # No mutable types in the default parameters - if properties is None: - properties = {} self.conn = self.get_connection(conn_id) connection_parameters = self.conn.extra_dejson self.job_tracker = connection_parameters.get('job_tracker', None) @@ -66,10 +64,11 @@ class SqoopHook(BaseHook, LoggingMixin): self.hcatalog_table = hcatalog_table self.verbose = verbose self.num_mappers = num_mappers - self.properties = properties + self.properties = properties or {} + self.log.info("Using connection to: {}:{}/{}".format(self.conn.host, self.conn.port, self.conn.schema)) def get_conn(self): - pass + return self.conn def cmd_mask_password(self, cmd): try: @@ -87,7 +86,7 @@ class SqoopHook(BaseHook, LoggingMixin): :param kwargs: extra arguments to Popen (see subprocess.Popen) :return: handle to subprocess """ - self.log.info("Executing command: %s", ' '.join(cmd)) + self.log.info("Executing command: {}".format(' '.join(self.cmd_mask_password(cmd)))) sp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -101,32 +100,33 @@ class SqoopHook(BaseHook, LoggingMixin): self.log.info("Command exited with return code %s", sp.returncode) if sp.returncode: - raise AirflowException("Sqoop command failed: %s", ' '.join(cmd)) + raise AirflowException("Sqoop command failed: {}".format(' '.join(self.cmd_mask_password(cmd)))) def _prepare_command(self, export=False): - if export: - connection_cmd = ["sqoop", "export"] - else: - connection_cmd = ["sqoop", "import"] + sqoop_cmd_type = "export" if export else "import" + connection_cmd = ["sqoop", sqoop_cmd_type] - if self.verbose: - connection_cmd += ["--verbose"] + for key, value in self.properties.items(): + connection_cmd += ["-D", "{}={}".format(key, value)] + + if self.namenode: + connection_cmd += ["-fs", self.namenode] if self.job_tracker: connection_cmd += ["-jt", self.job_tracker] - if self.conn.login: - connection_cmd += ["--username", self.conn.login] - if self.conn.password: - connection_cmd += ["--password", self.conn.password] - if self.password_file: - connection_cmd += ["--password-file", self.password_file] if self.libjars: connection_cmd += ["-libjars", self.libjars] if self.files: connection_cmd += ["-files", self.files] - if self.namenode: - connection_cmd += ["-fs", self.namenode] if self.archives: connection_cmd += ["-archives", self.archives] + if self.conn.login: + connection_cmd += ["--username", self.conn.login] + if self.conn.password: + connection_cmd += ["--password", self.conn.password] + if self.password_file: + connection_cmd += ["--password-file", self.password_file] + if self.verbose: + connection_cmd += ["--verbose"] if self.num_mappers: connection_cmd += ["--num-mappers", str(self.num_mappers)] if self.hcatalog_database: @@ -134,9 +134,6 @@ class SqoopHook(BaseHook, LoggingMixin): if self.hcatalog_table: connection_cmd += ["--hcatalog-table", self.hcatalog_table] - for key, value in self.properties.items(): - connection_cmd += ["-D", "{}={}".format(key, value)] - connection_cmd += ["--connect", "{}:{}/{}".format( self.conn.host, self.conn.port, @@ -145,7 +142,8 @@ class SqoopHook(BaseHook, LoggingMixin): return connection_cmd - def _get_export_format_argument(self, file_type='text'): + @staticmethod + def _get_export_format_argument(file_type='text'): if file_type == "avro": return ["--as-avrodatafile"] elif file_type == "sequence": @@ -159,11 +157,12 @@ class SqoopHook(BaseHook, LoggingMixin): "'sequence', 'parquet' or 'text'.") def _import_cmd(self, target_dir, append, file_type, split_by, direct, - driver): + driver, extra_import_options): cmd = self._prepare_command(export=False) - cmd += ["--target-dir", target_dir] + if target_dir: + cmd += ["--target-dir", target_dir] if append: cmd += ["--append"] @@ -179,11 +178,16 @@ class SqoopHook(BaseHook, LoggingMixin): if driver: cmd += ["--driver", driver] + if extra_import_options: + for key, value in extra_import_options.items(): + cmd += ['--{}'.format(key)] + if value: cmd += [value] + return cmd - def import_table(self, table, target_dir, append=False, file_type="text", + def import_table(self, table, target_dir=None, append=False, file_type="text", columns=None, split_by=None, where=None, direct=False, - driver=None): + driver=None, extra_import_options=None): """ Imports table from remote location to target dir. Arguments are copies of direct sqoop command line arguments @@ -197,9 +201,12 @@ class SqoopHook(BaseHook, LoggingMixin): :param where: WHERE clause to use during import :param direct: Use direct connector if exists for the database :param driver: Manually specify JDBC driver class to use + :param extra_import_options: Extra import options to pass as dict. + If a key doesn't have a value, just pass an empty string to it. + Don't include prefix of -- for sqoop options. """ cmd = self._import_cmd(target_dir, append, file_type, split_by, direct, - driver) + driver, extra_import_options) cmd += ["--table", table] @@ -210,9 +217,8 @@ class SqoopHook(BaseHook, LoggingMixin): self.Popen(cmd) - def import_query(self, query, target_dir, - append=False, file_type="text", - split_by=None, direct=None, driver=None): + def import_query(self, query, target_dir, append=False, file_type="text", + split_by=None, direct=None, driver=None, extra_import_options=None): """ Imports a specific query from the rdbms to hdfs :param query: Free format query to run @@ -223,9 +229,12 @@ class SqoopHook(BaseHook, LoggingMixin): :param split_by: Column of the table used to split work units :param direct: Use direct import fast path :param driver: Manually specify JDBC driver class to use + :param extra_import_options: Extra import options to pass as dict. + If a key doesn't have a value, just pass an empty string to it. + Don't include prefix of -- for sqoop options. """ cmd = self._import_cmd(target_dir, append, file_type, split_by, direct, - driver) + driver, extra_import_options) cmd += ["--query", query] self.Popen(cmd) @@ -234,7 +243,7 @@ class SqoopHook(BaseHook, LoggingMixin): input_null_non_string, staging_table, clear_staging_table, enclosed_by, escaped_by, input_fields_terminated_by, input_lines_terminated_by, input_optionally_enclosed_by, - batch, relaxed_isolation): + batch, relaxed_isolation, extra_export_options): cmd = self._prepare_command(export=True) @@ -275,6 +284,11 @@ class SqoopHook(BaseHook, LoggingMixin): if export_dir: cmd += ["--export-dir", export_dir] + if extra_export_options: + for key, value in extra_export_options.items(): + cmd += ['--{}'.format(key)] + if value: cmd += [value] + # The required option cmd += ["--table", table] @@ -286,7 +300,7 @@ class SqoopHook(BaseHook, LoggingMixin): escaped_by, input_fields_terminated_by, input_lines_terminated_by, input_optionally_enclosed_by, batch, - relaxed_isolation): + relaxed_isolation, extra_export_options=None): """ Exports Hive table to remote location. Arguments are copies of direct sqoop command line Arguments @@ -308,6 +322,9 @@ class SqoopHook(BaseHook, LoggingMixin): :param batch: Use batch mode for underlying statement execution :param relaxed_isolation: Transaction isolation to read uncommitted for the mappers + :param extra_export_options: Extra export options to pass as dict. + If a key doesn't have a value, just pass an empty string to it. + Don't include prefix of -- for sqoop options. """ cmd = self._export_cmd(table, export_dir, input_null_string, input_null_non_string, staging_table, @@ -315,6 +332,6 @@ class SqoopHook(BaseHook, LoggingMixin): input_fields_terminated_by, input_lines_terminated_by, input_optionally_enclosed_by, batch, - relaxed_isolation) + relaxed_isolation, extra_export_options) self.Popen(cmd) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1d531555/airflow/contrib/operators/sqoop_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/sqoop_operator.py b/airflow/contrib/operators/sqoop_operator.py index c3da176..cdaf336 100644 --- a/airflow/contrib/operators/sqoop_operator.py +++ b/airflow/contrib/operators/sqoop_operator.py @@ -25,8 +25,15 @@ from airflow.utils.decorators import apply_defaults class SqoopOperator(BaseOperator): """ - execute sqoop job + Execute a Sqoop job. + Documentation for Apache Sqoop can be found here: https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html. """ + template_fields = ('conn_id', 'cmd_type', 'table', 'query', 'target_dir', 'file_type', 'columns', 'split_by', + 'where', 'export_dir', 'input_null_string', 'input_null_non_string', 'staging_table', + 'enclosed_by', 'escaped_by', 'input_fields_terminated_by', 'input_lines_terminated_by', + 'input_optionally_enclosed_by', 'properties', 'extra_import_options', 'driver', + 'extra_export_options', 'hcatalog_database', 'hcatalog_table',) + ui_color = '#7D8CA4' @apply_defaults def __init__(self, @@ -36,7 +43,7 @@ class SqoopOperator(BaseOperator): query=None, target_dir=None, append=None, - file_type=None, + file_type='text', columns=None, num_mappers=None, split_by=None, @@ -59,12 +66,18 @@ class SqoopOperator(BaseOperator): properties=None, hcatalog_database=None, hcatalog_table=None, + create_hcatalog_table=False, + extra_import_options=None, + extra_export_options=None, *args, **kwargs): """ :param conn_id: str :param cmd_type: str specify command to execute "export" or "import" :param table: Table to read + :param query: Import result of arbitrary SQL query. Instead of using the table, + columns and where arguments, you can specify a SQL statement with the query + argument. Must also specify a destination directory with target_dir. :param target_dir: HDFS destination directory where the data from the rdbms will be written :param append: Append data to an existing dataset in HDFS @@ -95,7 +108,14 @@ class SqoopOperator(BaseOperator): :param relaxed_isolation: use read uncommitted isolation level :param hcatalog_database: Specifies the database name for the HCatalog table :param hcatalog_table: The argument value for this option is the HCatalog table + :param create_hcatalog_table: Have sqoop create the hcatalog table passed in or not :param properties: additional JVM properties passed to sqoop + :param extra_import_options: Extra import options to pass as dict. + If a key doesn't have a value, just pass an empty string to it. + Don't include prefix of -- for sqoop options. + :param extra_export_options: Extra export options to pass as dict. + If a key doesn't have a value, just pass an empty string to it. + Don't include prefix of -- for sqoop options. """ super(SqoopOperator, self).__init__(*args, **kwargs) self.conn_id = conn_id @@ -126,10 +146,10 @@ class SqoopOperator(BaseOperator): self.relaxed_isolation = relaxed_isolation self.hcatalog_database = hcatalog_database self.hcatalog_table = hcatalog_table - # No mutable types in the default parameters - if properties is None: - properties = {} + self.create_hcatalog_table = create_hcatalog_table self.properties = properties + self.extra_import_options = extra_import_options + self.extra_export_options = extra_export_options def execute(self, context): """ @@ -156,9 +176,18 @@ class SqoopOperator(BaseOperator): input_lines_terminated_by=self.input_lines_terminated_by, input_optionally_enclosed_by=self.input_optionally_enclosed_by, batch=self.batch, - relaxed_isolation=self.relaxed_isolation) + relaxed_isolation=self.relaxed_isolation, + extra_export_options=self.extra_export_options) elif self.cmd_type == 'import': - if not self.table: + # add create hcatalog table to extra import options if option passed + # if new params are added to constructor can pass them in here so don't modify sqoop_hook for each param + if self.create_hcatalog_table: + self.extra_import_options['create-hcatalog-table'] = '' + + if self.table and self.query: + raise AirflowException('Cannot specify query and table together. Need to specify either or.') + + if self.table: hook.import_table( table=self.table, target_dir=self.target_dir, @@ -168,16 +197,18 @@ class SqoopOperator(BaseOperator): split_by=self.split_by, where=self.where, direct=self.direct, - driver=self.driver) - elif not self.query: + driver=self.driver, + extra_import_options=self.extra_import_options) + elif self.query: hook.import_query( - query=self.table, + query=self.query, target_dir=self.target_dir, append=self.append, file_type=self.file_type, split_by=self.split_by, direct=self.direct, - driver=self.driver) + driver=self.driver, + extra_import_options=self.extra_import_options) else: raise AirflowException( "Provide query or table parameter to import using Sqoop" http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1d531555/tests/contrib/hooks/test_sqoop_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_sqoop_hook.py b/tests/contrib/hooks/test_sqoop_hook.py index 7c934b9..3eba9ec 100644 --- a/tests/contrib/hooks/test_sqoop_hook.py +++ b/tests/contrib/hooks/test_sqoop_hook.py @@ -13,6 +13,7 @@ # limitations under the License. # +import collections import json import unittest @@ -40,8 +41,8 @@ class TestSqoopHook(unittest.TestCase): _config_export = { 'table': 'domino.export_data_to', 'export_dir': '/hdfs/data/to/be/exported', - 'input_null_string': '\n', - 'input_null_non_string': '\t', + 'input_null_string': '\\n', + 'input_null_non_string': '\\t', 'staging_table': 'database.staging', 'clear_staging_table': True, 'enclosed_by': '"', @@ -50,7 +51,11 @@ class TestSqoopHook(unittest.TestCase): 'input_lines_terminated_by': '\n', 'input_optionally_enclosed_by': '"', 'batch': True, - 'relaxed_isolation': True + 'relaxed_isolation': True, + 'extra_export_options': collections.OrderedDict([ + ('update-key', 'id'), + ('update-mode', 'allowinsert') + ]) } _config_import = { 'target_dir': '/hdfs/data/target/location', @@ -58,7 +63,11 @@ class TestSqoopHook(unittest.TestCase): 'file_type': 'parquet', 'split_by': '\n', 'direct': True, - 'driver': 'com.microsoft.jdbc.sqlserver.SQLServerDriver' + 'driver': 'com.microsoft.jdbc.sqlserver.SQLServerDriver', + 'extra_import_options': { + 'hcatalog-storage-stanza': "\"stored as orcfile\"", + 'show': '' + } } _config_json = { @@ -94,10 +103,10 @@ class TestSqoopHook(unittest.TestCase): self.assertEqual(mock_popen.mock_calls[0], call( ['sqoop', 'export', + '-fs', self._config_json['namenode'], '-jt', self._config_json['job_tracker'], '-libjars', self._config_json['libjars'], '-files', self._config_json['files'], - '-fs', self._config_json['namenode'], '-archives', self._config_json['archives'], '--connect', 'rmdbs:5050/schema', '--input-null-string', self._config_export['input_null_string'], @@ -112,9 +121,25 @@ class TestSqoopHook(unittest.TestCase): '--batch', '--relaxed-isolation', '--export-dir', self._config_export['export_dir'], + '--update-key', 'id', + '--update-mode', 'allowinsert', '--table', self._config_export['table']], stderr=-2, stdout=-1)) + def test_submit_none_mappers(self): + """ + Test to check that if value of num_mappers is None, then it shouldn't be in the cmd built. + """ + _config_without_mappers = self._config.copy() + _config_without_mappers['num_mappers'] = None + + hook = SqoopHook(**_config_without_mappers) + cmd = ' '.join(hook._prepare_command()) + self.assertNotIn('--num-mappers', cmd) + def test_submit(self): + """ + Tests to verify that from connection extra option the options are added to the Sqoop command. + """ hook = SqoopHook(**self._config) cmd = ' '.join(hook._prepare_command()) @@ -124,20 +149,16 @@ class TestSqoopHook(unittest.TestCase): self.assertIn("-fs {}".format(self._config_json['namenode']), cmd) if self._config_json['job_tracker']: - self.assertIn("-jt {}".format(self._config_json['job_tracker']), - cmd) + self.assertIn("-jt {}".format(self._config_json['job_tracker']), cmd) if self._config_json['libjars']: - self.assertIn("-libjars {}".format(self._config_json['libjars']), - cmd) + self.assertIn("-libjars {}".format(self._config_json['libjars']), cmd) if self._config_json['files']: self.assertIn("-files {}".format(self._config_json['files']), cmd) if self._config_json['archives']: - self.assertIn( - "-archives {}".format(self._config_json['archives']), cmd - ) + self.assertIn( "-archives {}".format(self._config_json['archives']), cmd) self.assertIn("--hcatalog-database {}".format(self._config['hcatalog_database']), cmd) self.assertIn("--hcatalog-table {}".format(self._config['hcatalog_table']), cmd) @@ -147,11 +168,8 @@ class TestSqoopHook(unittest.TestCase): self.assertIn("--verbose", cmd) if self._config['num_mappers']: - self.assertIn( - "--num-mappers {}".format(self._config['num_mappers']), cmd - ) + self.assertIn( "--num-mappers {}".format(self._config['num_mappers']), cmd) - print(self._config['properties']) for key, value in self._config['properties'].items(): self.assertIn("-D {}={}".format(key, value), cmd) @@ -161,14 +179,15 @@ class TestSqoopHook(unittest.TestCase): hook.export_table(**self._config_export) with self.assertRaises(OSError): - hook.import_table(table='schema.table', - target_dir='/sqoop/example/path') + hook.import_table(table='schema.table', target_dir='/sqoop/example/path') with self.assertRaises(OSError): - hook.import_query(query='SELECT * FROM sometable', - target_dir='/sqoop/example/path') + hook.import_query(query='SELECT * FROM sometable', target_dir='/sqoop/example/path') def test_export_cmd(self): + """ + Tests to verify the hook export command is building correct Sqoop export command. + """ hook = SqoopHook() # The subprocess requires an array but we build the cmd by joining on a space @@ -190,7 +209,9 @@ class TestSqoopHook(unittest.TestCase): input_optionally_enclosed_by=self._config_export[ 'input_optionally_enclosed_by'], batch=self._config_export['batch'], - relaxed_isolation=self._config_export['relaxed_isolation']) + relaxed_isolation=self._config_export['relaxed_isolation'], + extra_export_options=self._config_export['extra_export_options'] + ) ) self.assertIn("--input-null-string {}".format( @@ -209,6 +230,9 @@ class TestSqoopHook(unittest.TestCase): self._config_export['input_lines_terminated_by']), cmd) self.assertIn("--input-optionally-enclosed-by {}".format( self._config_export['input_optionally_enclosed_by']), cmd) + # these options are from the extra export options + self.assertIn("--update-key id", cmd) + self.assertIn("--update-mode allowinsert", cmd) if self._config_export['clear_staging_table']: self.assertIn("--clear-staging-table", cmd) @@ -220,16 +244,22 @@ class TestSqoopHook(unittest.TestCase): self.assertIn("--relaxed-isolation", cmd) def test_import_cmd(self): + """ + Tests to verify the hook import command is building correct Sqoop import command. + """ hook = SqoopHook() # The subprocess requires an array but we build the cmd by joining on a space cmd = ' '.join( - hook._import_cmd(self._config_import['target_dir'], - append=self._config_import['append'], - file_type=self._config_import['file_type'], - split_by=self._config_import['split_by'], - direct=self._config_import['direct'], - driver=self._config_import['driver']) + hook._import_cmd( + self._config_import['target_dir'], + append=self._config_import['append'], + file_type=self._config_import['file_type'], + split_by=self._config_import['split_by'], + direct=self._config_import['direct'], + driver=self._config_import['driver'], + extra_import_options=None + ) ) if self._config_import['append']: @@ -242,10 +272,32 @@ class TestSqoopHook(unittest.TestCase): self._config_import['target_dir']), cmd) self.assertIn('--driver {}'.format(self._config_import['driver']), cmd) - self.assertIn('--split-by {}'.format(self._config_import['split_by']), - cmd) + self.assertIn('--split-by {}'.format(self._config_import['split_by']), cmd) + # these are from extra options, but not passed to this cmd import command + self.assertNotIn('--show', cmd) + self.assertNotIn('hcatalog-storage-stanza \"stored as orcfile\"', cmd) + + cmd = ' '.join( + hook._import_cmd( + target_dir=None, + append=self._config_import['append'], + file_type=self._config_import['file_type'], + split_by=self._config_import['split_by'], + direct=self._config_import['direct'], + driver=self._config_import['driver'], + extra_import_options=self._config_import['extra_import_options'] + ) + ) + + self.assertNotIn('--target-dir', cmd) + # these checks are from the extra import options + self.assertIn('--show', cmd) + self.assertIn('hcatalog-storage-stanza \"stored as orcfile\"', cmd) def test_get_export_format_argument(self): + """ + Tests to verify the hook get format function is building correct Sqoop command with correct format type. + """ hook = SqoopHook() self.assertIn("--as-avrodatafile", hook._get_export_format_argument('avro')) @@ -259,6 +311,9 @@ class TestSqoopHook(unittest.TestCase): hook._get_export_format_argument('unknown') def test_cmd_mask_password(self): + """ + Tests to verify the hook masking function will correctly mask a user password in Sqoop command. + """ hook = SqoopHook() self.assertEqual( hook.cmd_mask_password(['--password', 'supersecret']), http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1d531555/tests/contrib/operators/test_sqoop_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_sqoop_operator.py b/tests/contrib/operators/test_sqoop_operator.py index f632805..d9e39b5 100644 --- a/tests/contrib/operators/test_sqoop_operator.py +++ b/tests/contrib/operators/test_sqoop_operator.py @@ -23,6 +23,7 @@ from airflow.exceptions import AirflowException class TestSqoopOperator(unittest.TestCase): _config = { + 'conn_id': 'sqoop_default', 'cmd_type': 'export', 'table': 'target_table', 'query': 'SELECT * FROM schema.table', @@ -46,10 +47,19 @@ class TestSqoopOperator(unittest.TestCase): 'relaxed_isolation': True, 'direct': True, 'driver': 'com.microsoft.jdbc.sqlserver.SQLServerDriver', + 'create_hcatalog_table': True, 'hcatalog_database': 'hive_database', 'hcatalog_table': 'hive_table', 'properties': { 'mapred.map.max.attempts': '1' + }, + 'extra_import_options': { + 'hcatalog-storage-stanza': "\"stored as orcfile\"", + 'show': '' + }, + 'extra_export_options': { + 'update-key': 'id', + 'update-mode': 'allowinsert' } } @@ -61,15 +71,18 @@ class TestSqoopOperator(unittest.TestCase): } self.dag = DAG('test_dag_id', default_args=args) - def test_execute(self, conn_id='sqoop_default'): + def test_execute(self): + """ + Tests to verify values of the SqoopOperator match that passed in from the config. + """ operator = SqoopOperator( task_id='sqoop_job', dag=self.dag, **self._config ) - self.assertEqual(conn_id, operator.conn_id) - + self.assertEqual(self._config['conn_id'], operator.conn_id) + self.assertEqual(self._config['query'], operator.query) self.assertEqual(self._config['cmd_type'], operator.cmd_type) self.assertEqual(self._config['table'], operator.table) self.assertEqual(self._config['target_dir'], operator.target_dir) @@ -77,28 +90,117 @@ class TestSqoopOperator(unittest.TestCase): self.assertEqual(self._config['file_type'], operator.file_type) self.assertEqual(self._config['num_mappers'], operator.num_mappers) self.assertEqual(self._config['split_by'], operator.split_by) - self.assertEqual(self._config['input_null_string'], - operator.input_null_string) - self.assertEqual(self._config['input_null_non_string'], - operator.input_null_non_string) + self.assertEqual(self._config['input_null_string'], operator.input_null_string) + self.assertEqual(self._config['input_null_non_string'], operator.input_null_non_string) self.assertEqual(self._config['staging_table'], operator.staging_table) - self.assertEqual(self._config['clear_staging_table'], - operator.clear_staging_table) + self.assertEqual(self._config['clear_staging_table'], operator.clear_staging_table) self.assertEqual(self._config['batch'], operator.batch) - self.assertEqual(self._config['relaxed_isolation'], - operator.relaxed_isolation) + self.assertEqual(self._config['relaxed_isolation'], operator.relaxed_isolation) self.assertEqual(self._config['direct'], operator.direct) self.assertEqual(self._config['driver'], operator.driver) self.assertEqual(self._config['properties'], operator.properties) self.assertEqual(self._config['hcatalog_database'], operator.hcatalog_database) self.assertEqual(self._config['hcatalog_table'], operator.hcatalog_table) + self.assertEqual(self._config['create_hcatalog_table'], operator.create_hcatalog_table) + self.assertEqual(self._config['extra_import_options'], operator.extra_import_options) + self.assertEqual(self._config['extra_export_options'], operator.extra_export_options) + + # the following are meant to be more of examples + sqoop_import_op = SqoopOperator( + task_id='sqoop_import_using_table', + cmd_type='import', + conn_id='sqoop_default', + table='company', + verbose=True, + num_mappers=8, + hcatalog_database='default', + hcatalog_table='import_table_1', + create_hcatalog_table=True, + extra_import_options={'hcatalog-storage-stanza': "\"stored as orcfile\""}, + dag=self.dag + ) + + sqoop_import_op_qry = SqoopOperator( + task_id='sqoop_import_using_query', + cmd_type='import', + conn_id='sqoop_default', + query='select name, age from company where $CONDITIONS', + split_by='age', # the mappers will pass in values to the $CONDITIONS based on the field you select to split by + verbose=True, + num_mappers=None, + hcatalog_database='default', + hcatalog_table='import_table_2', + create_hcatalog_table=True, + extra_import_options={'hcatalog-storage-stanza': "\"stored as orcfile\""}, + dag=self.dag + ) + + sqoop_import_op_with_partition = SqoopOperator( + task_id='sqoop_import_with_partition', + cmd_type='import', + conn_id='sqoop_default', + table='company', + verbose=True, + num_mappers=None, + hcatalog_database='default', + hcatalog_table='import_table_3', + create_hcatalog_table=True, + extra_import_options={ + 'hcatalog-storage-stanza': "\"stored as orcfile\"", + 'hive-partition-key': 'day', + 'hive-partition-value': '2017-10-18'}, + dag=self.dag + ) + + sqoop_export_op_name = SqoopOperator( + task_id='sqoop_export_tablename', + cmd_type='export', + conn_id='sqoop_default', + table='rbdms_export_table_1', + verbose=True, + num_mappers=None, + hcatalog_database='default', + hcatalog_table='hive_export_table_1', + extra_export_options=None, + dag=self.dag + ) + + sqoop_export_op_path = SqoopOperator( + task_id='sqoop_export_tablepath', + cmd_type='export', + conn_id='sqoop_default', + table='rbdms_export_table_2', + export_dir='/user/hive/warehouse/export_table_2', + direct=True, # speeds up for data transfer + verbose=True, + num_mappers=None, + extra_export_options=None, + dag=self.dag + ) def test_invalid_cmd_type(self): + """ + Tests to verify if the cmd_type is not import or export, an exception is raised. + """ operator = SqoopOperator(task_id='sqoop_job', dag=self.dag, cmd_type='invalid') with self.assertRaises(AirflowException): operator.execute({}) + def test_invalid_import_options(self): + """ + Tests to verify if a user passes both a query and a table then an exception is raised. + """ + import_query_and_table_configs = self._config.copy() + import_query_and_table_configs['cmd_type'] = 'import' + operator = SqoopOperator( + task_id='sqoop_job', + dag=self.dag, + **import_query_and_table_configs + ) + with self.assertRaises(AirflowException): + operator.execute({}) + if __name__ == '__main__': unittest.main()
