This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fc917af Fix Minor Bugs in Apache Sqoop Hook and Operator (#16350)
fc917af is described below
commit fc917af8b49a914d4404faebbec807679f0626af
Author: Michele Zanchi <[email protected]>
AuthorDate: Sat Jul 10 13:09:47 2021 +0200
Fix Minor Bugs in Apache Sqoop Hook and Operator (#16350)
---
airflow/providers/apache/sqoop/hooks/sqoop.py | 17 ++++++++++-
airflow/providers/apache/sqoop/operators/sqoop.py | 6 ++++
tests/providers/apache/sqoop/hooks/test_sqoop.py | 34 ++++++++++++++++++++--
.../providers/apache/sqoop/operators/test_sqoop.py | 2 ++
4 files changed, 56 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/apache/sqoop/hooks/sqoop.py
b/airflow/providers/apache/sqoop/hooks/sqoop.py
index 21684a5..1f6b182 100644
--- a/airflow/providers/apache/sqoop/hooks/sqoop.py
+++ b/airflow/providers/apache/sqoop/hooks/sqoop.py
@@ -150,7 +150,11 @@ class SqoopHook(BaseHook):
if self.conn.port:
connect_str += f":{self.conn.port}"
if self.conn.schema:
- connect_str += f"/{self.conn.schema}"
+ self.log.info("CONNECTION TYPE %s", self.conn.conn_type)
+ if self.conn.conn_type != 'mssql':
+ connect_str += f"/{self.conn.schema}"
+ else:
+ connect_str += f";databaseName={self.conn.schema}"
connection_cmd += ["--connect", connect_str]
return connection_cmd
@@ -218,12 +222,14 @@ class SqoopHook(BaseHook):
direct: bool = False,
driver: Any = None,
extra_import_options: Optional[Dict[str, Any]] = None,
+ schema: Optional[str] = None,
) -> Any:
"""
Imports table from remote location to target dir. Arguments are
copies of direct sqoop command line arguments
:param table: Table to read
+ :param schema: Schema name
:param target_dir: HDFS destination dir
:param append: Append data to an existing dataset in HDFS
:param file_type: "avro", "sequence", "text" or "parquet".
@@ -245,6 +251,8 @@ class SqoopHook(BaseHook):
cmd += ["--columns", columns]
if where:
cmd += ["--where", where]
+ if schema:
+ cmd += ["--", "--schema", schema]
self.popen(cmd)
@@ -295,6 +303,7 @@ class SqoopHook(BaseHook):
batch: bool = False,
relaxed_isolation: bool = False,
extra_export_options: Optional[Dict[str, Any]] = None,
+ schema: Optional[str] = None,
) -> List[str]:
cmd = self._prepare_command(export=True)
@@ -344,6 +353,9 @@ class SqoopHook(BaseHook):
# The required option
cmd += ["--table", table]
+ if schema:
+ cmd += ["--", "--schema", schema]
+
return cmd
def export_table(
@@ -362,12 +374,14 @@ class SqoopHook(BaseHook):
batch: bool = False,
relaxed_isolation: bool = False,
extra_export_options: Optional[Dict[str, Any]] = None,
+ schema: Optional[str] = None,
) -> None:
"""
Exports Hive table to remote location. Arguments are copies of direct
sqoop command line Arguments
:param table: Table remote destination
+ :param schema: Schema name
:param export_dir: Hive table to export
:param input_null_string: The string to be interpreted as null for
string columns
@@ -404,6 +418,7 @@ class SqoopHook(BaseHook):
batch,
relaxed_isolation,
extra_export_options,
+ schema,
)
self.popen(cmd)
diff --git a/airflow/providers/apache/sqoop/operators/sqoop.py
b/airflow/providers/apache/sqoop/operators/sqoop.py
index 242ed25..226c777 100644
--- a/airflow/providers/apache/sqoop/operators/sqoop.py
+++ b/airflow/providers/apache/sqoop/operators/sqoop.py
@@ -34,6 +34,7 @@ class SqoopOperator(BaseOperator):
:param conn_id: str
:param cmd_type: str specify command to execute "export" or "import"
+ :param schema: Schema name
:param table: Table to read
:param query: Import result of arbitrary SQL query. Instead of using the
table,
columns and where arguments, you can specify a SQL statement with the
query
@@ -104,6 +105,7 @@ class SqoopOperator(BaseOperator):
'extra_export_options',
'hcatalog_database',
'hcatalog_table',
+ 'schema',
)
ui_color = '#7D8CA4'
@@ -142,6 +144,7 @@ class SqoopOperator(BaseOperator):
create_hcatalog_table: bool = False,
extra_import_options: Optional[Dict[str, Any]] = None,
extra_export_options: Optional[Dict[str, Any]] = None,
+ schema: Optional[str] = None,
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
@@ -178,6 +181,7 @@ class SqoopOperator(BaseOperator):
self.extra_import_options = extra_import_options or {}
self.extra_export_options = extra_export_options or {}
self.hook: Optional[SqoopHook] = None
+ self.schema = schema
def execute(self, context: Dict[str, Any]) -> None:
"""Execute sqoop job"""
@@ -200,6 +204,7 @@ class SqoopOperator(BaseOperator):
batch=self.batch,
relaxed_isolation=self.relaxed_isolation,
extra_export_options=self.extra_export_options,
+ schema=self.schema,
)
elif self.cmd_type == 'import':
# add create hcatalog table to extra import options if option
passed
@@ -223,6 +228,7 @@ class SqoopOperator(BaseOperator):
direct=self.direct,
driver=self.driver,
extra_import_options=self.extra_import_options,
+ schema=self.schema,
)
elif self.query:
self.hook.import_query(
diff --git a/tests/providers/apache/sqoop/hooks/test_sqoop.py
b/tests/providers/apache/sqoop/hooks/test_sqoop.py
index 8c72ca9..c6a40ed 100644
--- a/tests/providers/apache/sqoop/hooks/test_sqoop.py
+++ b/tests/providers/apache/sqoop/hooks/test_sqoop.py
@@ -42,7 +42,7 @@ class TestSqoopHook(unittest.TestCase):
'hcatalog_table': 'hive_table',
}
_config_export = {
- 'table': 'domino.export_data_to',
+ 'table': 'export_data_to',
'export_dir': '/hdfs/data/to/be/exported',
'input_null_string': '\\n',
'input_null_non_string': '\\t',
@@ -58,6 +58,7 @@ class TestSqoopHook(unittest.TestCase):
'extra_export_options': collections.OrderedDict(
[('update-key', 'id'), ('update-mode', 'allowinsert'),
('fetch-size', 1)]
),
+ 'schema': 'domino',
}
_config_import = {
'target_dir': '/hdfs/data/target/location',
@@ -92,6 +93,16 @@ class TestSqoopHook(unittest.TestCase):
extra=json.dumps(self._config_json),
)
)
+ db.merge_conn(
+ Connection(
+ conn_id='sqoop_test_mssql',
+ conn_type='mssql',
+ schema='schema',
+ host='rmdbs',
+ port=5050,
+ extra=None,
+ )
+ )
@patch('subprocess.Popen')
def test_popen(self, mock_popen):
@@ -156,6 +167,9 @@ class TestSqoopHook(unittest.TestCase):
str(self._config_export['extra_export_options'].get('fetch-size')),
'--table',
self._config_export['table'],
+ '--',
+ '--schema',
+ self._config_export['schema'],
],
stderr=-2,
stdout=-1,
@@ -215,7 +229,7 @@ class TestSqoopHook(unittest.TestCase):
hook.export_table(**self._config_export)
with pytest.raises(OSError):
- hook.import_table(table='schema.table',
target_dir='/sqoop/example/path')
+ hook.import_table(table='table', target_dir='/sqoop/example/path',
schema='schema')
with pytest.raises(OSError):
hook.import_query(query='SELECT * FROM sometable',
target_dir='/sqoop/example/path')
@@ -243,6 +257,7 @@ class TestSqoopHook(unittest.TestCase):
batch=self._config_export['batch'],
relaxed_isolation=self._config_export['relaxed_isolation'],
extra_export_options=self._config_export['extra_export_options'],
+ schema=self._config_export['schema'],
)
)
@@ -272,6 +287,9 @@ class TestSqoopHook(unittest.TestCase):
assert "--update-mode" in cmd
assert "--fetch-size" in cmd
+ if self._config_export['schema']:
+ assert "-- --schema" in cmd
+
def test_import_cmd(self):
"""
Tests to verify the hook import command is building correct Sqoop
import command.
@@ -345,3 +363,15 @@ class TestSqoopHook(unittest.TestCase):
cmd = ['--target', 'targettable']
assert hook.cmd_mask_password(cmd) == cmd
+
+ def test_connection_string_preparation(self):
+ """
+ Tests to verify the hook creates the connection string correctly for
mssql and not DB connections.
+ """
+ # Case mssql
+ hook = SqoopHook(conn_id='sqoop_test_mssql')
+ assert
f"{hook.conn.host}:{hook.conn.port};databaseName={hook.conn.schema}" in
hook._prepare_command()
+
+ # Case no mssql
+ hook = SqoopHook(conn_id='sqoop_test')
+ assert f"{hook.conn.host}:{hook.conn.port}/{hook.conn.schema}" in
hook._prepare_command()
diff --git a/tests/providers/apache/sqoop/operators/test_sqoop.py
b/tests/providers/apache/sqoop/operators/test_sqoop.py
index 882d13a..9b8b205 100644
--- a/tests/providers/apache/sqoop/operators/test_sqoop.py
+++ b/tests/providers/apache/sqoop/operators/test_sqoop.py
@@ -59,6 +59,7 @@ class TestSqoopOperator(unittest.TestCase):
'properties': {'mapred.map.max.attempts': '1'},
'extra_import_options': {'hcatalog-storage-stanza': "\"stored as
orcfile\"", 'show': ''},
'extra_export_options': {'update-key': 'id', 'update-mode':
'allowinsert', 'fetch-size': 1},
+ 'schema': 'myschema',
}
def setUp(self):
@@ -94,6 +95,7 @@ class TestSqoopOperator(unittest.TestCase):
assert self._config['create_hcatalog_table'] ==
operator.create_hcatalog_table
assert self._config['extra_import_options'] ==
operator.extra_import_options
assert self._config['extra_export_options'] ==
operator.extra_export_options
+ assert self._config['schema'] == operator.schema
# the following are meant to be more of examples
SqoopOperator(