This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 59f5f58e2b Validate SqoopHook connection string and disable extra 
options from public hook methods (#33039)
59f5f58e2b is described below

commit 59f5f58e2bfcc8c12b0b44d6c6ce5cfed60b8867
Author: Pankaj Koti <[email protected]>
AuthorDate: Fri Aug 4 00:48:11 2023 +0530

    Validate SqoopHook connection string and disable extra options from public 
hook methods (#33039)
    
    * Validate SqoopHook connection string and disable extra options from hook 
methods
    
    Check that the connection string constructed using the connection's
    `host`, `port` and `schema` does not contain query params as it is not
    intended. Additionally, also disable the `extra_import_options` and
    `extra_export_options` arguments accepted directly by the hook
    methods but accept it as a param via the hook constructor when
    initialising the hook or by passing it in hook_params when initialising
    the hook from operators.
    
    * Propogate missing hook param changes to the operator
    
    * Remove test for invalid port with query param as ports are integers and 
not accepted by databases
    
    * Add 4.0.0 to provider.yaml
---
 airflow/providers/apache/sqoop/CHANGELOG.rst       | 11 +++++
 airflow/providers/apache/sqoop/hooks/sqoop.py      | 37 ++++++---------
 airflow/providers/apache/sqoop/operators/sqoop.py  | 30 +++++--------
 airflow/providers/apache/sqoop/provider.yaml       |  1 +
 tests/providers/apache/sqoop/hooks/test_sqoop.py   | 52 +++++++++++++++-------
 .../providers/apache/sqoop/operators/test_sqoop.py | 16 +++----
 6 files changed, 79 insertions(+), 68 deletions(-)

diff --git a/airflow/providers/apache/sqoop/CHANGELOG.rst 
b/airflow/providers/apache/sqoop/CHANGELOG.rst
index d824b7dd1d..87823fcb32 100644
--- a/airflow/providers/apache/sqoop/CHANGELOG.rst
+++ b/airflow/providers/apache/sqoop/CHANGELOG.rst
@@ -28,6 +28,17 @@
 Changelog
 ---------
 
+4.0.0
+.....
+
+Breaking changes
+~~~~~~~~~~~~~~~~
+
+The ``extra_import_options`` parameter in the ``import_table`` & 
``import_query`` methods
+and the ``extra_export_options`` in the ``export_table`` methods of the 
``SqoopHook``
+are no longer accepted as arguments for those methods. These should instead be 
passed
+as ``extra_options`` while initializing the Hook or via ``extra_options`` 
parameter to the
+operator which instantiates the hook with those given ``extra_options`` 
dictionary.
 
 3.2.1
 .....
diff --git a/airflow/providers/apache/sqoop/hooks/sqoop.py 
b/airflow/providers/apache/sqoop/hooks/sqoop.py
index 297c567c19..263d7a616b 100644
--- a/airflow/providers/apache/sqoop/hooks/sqoop.py
+++ b/airflow/providers/apache/sqoop/hooks/sqoop.py
@@ -46,6 +46,9 @@ class SqoopHook(BaseHook):
     :param num_mappers: Number of map tasks to import in parallel.
     :param properties: Properties to set via the -D argument
     :param libjars: Optional Comma separated jar files to include in the 
classpath.
+    :param extra_options:  Extra import/export options to pass as dict.
+        If a key doesn't have a value, just pass an empty string to it.
+        Don't include prefix of -- for sqoop options.
     """
 
     conn_name_attr = "conn_id"
@@ -62,6 +65,7 @@ class SqoopHook(BaseHook):
         hcatalog_table: str | None = None,
         properties: dict[str, Any] | None = None,
         libjars: str | None = None,
+        extra_options: dict[str, Any] | None = None,
     ) -> None:
         # No mutable types in the default parameters
         super().__init__()
@@ -79,6 +83,7 @@ class SqoopHook(BaseHook):
         self.num_mappers = num_mappers
         self.properties = properties or {}
         self.sub_process_pid: int
+        self._extra_options = extra_options
         self.log.info("Using connection to: %s:%s/%s", self.conn.host, 
self.conn.port, self.conn.schema)
 
     def get_conn(self) -> Any:
@@ -113,9 +118,6 @@ class SqoopHook(BaseHook):
                 raise AirflowException(f"Sqoop command failed: {masked_cmd}")
 
     def _prepare_command(self, export: bool = False) -> list[str]:
-        if "?" in self.conn.host:
-            raise ValueError("The sqoop connection host should not contain a 
'?' character")
-
         sqoop_cmd_type = "export" if export else "import"
         connection_cmd = ["sqoop", sqoop_cmd_type]
 
@@ -156,6 +158,8 @@ class SqoopHook(BaseHook):
                 connect_str += f"/{self.conn.schema}"
             else:
                 connect_str += f";databaseName={self.conn.schema}"
+        if "?" in connect_str:
+            raise ValueError("The sqoop connection string should not contain a 
'?' character")
         connection_cmd += ["--connect", connect_str]
 
         return connection_cmd
@@ -181,7 +185,6 @@ class SqoopHook(BaseHook):
         split_by: str | None,
         direct: bool | None,
         driver: Any,
-        extra_import_options: Any,
     ) -> list[str]:
 
         cmd = self._prepare_command(export=False)
@@ -203,8 +206,8 @@ class SqoopHook(BaseHook):
         if driver:
             cmd += ["--driver", driver]
 
-        if extra_import_options:
-            for key, value in extra_import_options.items():
+        if self._extra_options:
+            for key, value in self._extra_options.items():
                 cmd += [f"--{key}"]
                 if value:
                     cmd += [str(value)]
@@ -222,7 +225,6 @@ class SqoopHook(BaseHook):
         where: str | None = None,
         direct: bool = False,
         driver: Any = None,
-        extra_import_options: dict[str, Any] | None = None,
         schema: str | None = None,
     ) -> Any:
         """Import table from remote location to target dir.
@@ -240,11 +242,8 @@ class SqoopHook(BaseHook):
         :param where: WHERE clause to use during import
         :param direct: Use direct connector if exists for the database
         :param driver: Manually specify JDBC driver class to use
-        :param extra_import_options: Extra import options to pass as dict.
-            If a key doesn't have a value, just pass an empty string to it.
-            Don't include prefix of -- for sqoop options.
         """
-        cmd = self._import_cmd(target_dir, append, file_type, split_by, 
direct, driver, extra_import_options)
+        cmd = self._import_cmd(target_dir, append, file_type, split_by, 
direct, driver)
 
         cmd += ["--table", table]
 
@@ -266,7 +265,6 @@ class SqoopHook(BaseHook):
         split_by: str | None = None,
         direct: bool | None = None,
         driver: Any | None = None,
-        extra_import_options: dict[str, Any] | None = None,
     ) -> Any:
         """Import a specific query from the rdbms to hdfs.
 
@@ -278,11 +276,8 @@ class SqoopHook(BaseHook):
         :param split_by: Column of the table used to split work units
         :param direct: Use direct import fast path
         :param driver: Manually specify JDBC driver class to use
-        :param extra_import_options: Extra import options to pass as dict.
-            If a key doesn't have a value, just pass an empty string to it.
-            Don't include prefix of -- for sqoop options.
         """
-        cmd = self._import_cmd(target_dir, append, file_type, split_by, 
direct, driver, extra_import_options)
+        cmd = self._import_cmd(target_dir, append, file_type, split_by, 
direct, driver)
         cmd += ["--query", query]
 
         self.popen(cmd)
@@ -302,7 +297,6 @@ class SqoopHook(BaseHook):
         input_optionally_enclosed_by: str | None = None,
         batch: bool = False,
         relaxed_isolation: bool = False,
-        extra_export_options: dict[str, Any] | None = None,
         schema: str | None = None,
     ) -> list[str]:
 
@@ -344,8 +338,8 @@ class SqoopHook(BaseHook):
         if export_dir:
             cmd += ["--export-dir", export_dir]
 
-        if extra_export_options:
-            for key, value in extra_export_options.items():
+        if self._extra_options:
+            for key, value in self._extra_options.items():
                 cmd += [f"--{key}"]
                 if value:
                     cmd += [str(value)]
@@ -373,7 +367,6 @@ class SqoopHook(BaseHook):
         input_optionally_enclosed_by: str | None = None,
         batch: bool = False,
         relaxed_isolation: bool = False,
-        extra_export_options: dict[str, Any] | None = None,
         schema: str | None = None,
     ) -> None:
         """Export Hive table to remote location.
@@ -399,9 +392,6 @@ class SqoopHook(BaseHook):
         :param batch: Use batch mode for underlying statement execution
         :param relaxed_isolation: Transaction isolation to read uncommitted
             for the mappers
-        :param extra_export_options: Extra export options to pass as dict.
-            If a key doesn't have a value, just pass an empty string to it.
-            Don't include prefix of -- for sqoop options.
         """
         cmd = self._export_cmd(
             table,
@@ -417,7 +407,6 @@ class SqoopHook(BaseHook):
             input_optionally_enclosed_by,
             batch,
             relaxed_isolation,
-            extra_export_options,
             schema,
         )
 
diff --git a/airflow/providers/apache/sqoop/operators/sqoop.py 
b/airflow/providers/apache/sqoop/operators/sqoop.py
index c421a53dd4..c662afcc71 100644
--- a/airflow/providers/apache/sqoop/operators/sqoop.py
+++ b/airflow/providers/apache/sqoop/operators/sqoop.py
@@ -76,10 +76,7 @@ class SqoopOperator(BaseOperator):
     :param create_hcatalog_table: Have sqoop create the hcatalog table passed
         in or not
     :param properties: additional JVM properties passed to sqoop
-    :param extra_import_options: Extra import options to pass as dict.
-        If a key doesn't have a value, just pass an empty string to it.
-        Don't include prefix of -- for sqoop options.
-    :param extra_export_options: Extra export options to pass as dict.
+    :param extra_options:  Extra import/export options to pass as dict to the 
SqoopHook.
         If a key doesn't have a value, just pass an empty string to it.
         Don't include prefix of -- for sqoop options.
     :param libjars: Optional Comma separated jar files to include in the 
classpath.
@@ -105,9 +102,8 @@ class SqoopOperator(BaseOperator):
         "input_lines_terminated_by",
         "input_optionally_enclosed_by",
         "properties",
-        "extra_import_options",
+        "extra_options",
         "driver",
-        "extra_export_options",
         "hcatalog_database",
         "hcatalog_table",
         "schema",
@@ -148,8 +144,7 @@ class SqoopOperator(BaseOperator):
         hcatalog_database: str | None = None,
         hcatalog_table: str | None = None,
         create_hcatalog_table: bool = False,
-        extra_import_options: dict[str, Any] | None = None,
-        extra_export_options: dict[str, Any] | None = None,
+        extra_options: dict[str, Any] | None = None,
         schema: str | None = None,
         libjars: str | None = None,
         **kwargs: Any,
@@ -185,8 +180,7 @@ class SqoopOperator(BaseOperator):
         self.hcatalog_table = hcatalog_table
         self.create_hcatalog_table = create_hcatalog_table
         self.properties = properties
-        self.extra_import_options = extra_import_options or {}
-        self.extra_export_options = extra_export_options or {}
+        self.extra_options = extra_options or {}
         self.hook: SqoopHook | None = None
         self.schema = schema
         self.libjars = libjars
@@ -211,16 +205,9 @@ class SqoopOperator(BaseOperator):
                 input_optionally_enclosed_by=self.input_optionally_enclosed_by,
                 batch=self.batch,
                 relaxed_isolation=self.relaxed_isolation,
-                extra_export_options=self.extra_export_options,
                 schema=self.schema,
             )
         elif self.cmd_type == "import":
-            # add create hcatalog table to extra import options if option 
passed
-            # if new params are added to constructor can pass them in here
-            # so don't modify sqoop_hook for each param
-            if self.create_hcatalog_table:
-                self.extra_import_options["create-hcatalog-table"] = ""
-
             if self.table and self.query:
                 raise AirflowException("Cannot specify query and table 
together. Need to specify either or.")
 
@@ -235,7 +222,6 @@ class SqoopOperator(BaseOperator):
                     where=self.where,
                     direct=self.direct,
                     driver=self.driver,
-                    extra_import_options=self.extra_import_options,
                     schema=self.schema,
                 )
             elif self.query:
@@ -247,7 +233,6 @@ class SqoopOperator(BaseOperator):
                     split_by=self.split_by,
                     direct=self.direct,
                     driver=self.driver,
-                    extra_import_options=self.extra_import_options,
                 )
             else:
                 raise AirflowException("Provide query or table parameter to 
import using Sqoop")
@@ -261,6 +246,12 @@ class SqoopOperator(BaseOperator):
         os.killpg(os.getpgid(self.hook.sub_process_pid), signal.SIGTERM)
 
     def _get_hook(self) -> SqoopHook:
+        """Returns a SqoopHook instance."""
+        # Add `create-hcatalog-table` to extra options if option passed to 
operator in case of `import`
+        # command. Similarly, if new parameters are added to the operator, you 
can pass them to
+        # `extra_options` so that you don't need to modify `SqoopHook` for 
each new parameter.
+        if self.cmd_type == "import" and self.create_hcatalog_table:
+            self.extra_options["create-hcatalog-table"] = ""
         return SqoopHook(
             conn_id=self.conn_id,
             verbose=self.verbose,
@@ -269,4 +260,5 @@ class SqoopOperator(BaseOperator):
             hcatalog_table=self.hcatalog_table,
             properties=self.properties,
             libjars=self.libjars,
+            extra_options=self.extra_options,
         )
diff --git a/airflow/providers/apache/sqoop/provider.yaml 
b/airflow/providers/apache/sqoop/provider.yaml
index b358d73b2d..60088ab22c 100644
--- a/airflow/providers/apache/sqoop/provider.yaml
+++ b/airflow/providers/apache/sqoop/provider.yaml
@@ -23,6 +23,7 @@ description: |
 
 suspended: false
 versions:
+  - 4.0.0
   - 3.2.1
   - 3.2.0
   - 3.1.1
diff --git a/tests/providers/apache/sqoop/hooks/test_sqoop.py 
b/tests/providers/apache/sqoop/hooks/test_sqoop.py
index 0075ebec9b..3c7d4fb5e8 100644
--- a/tests/providers/apache/sqoop/hooks/test_sqoop.py
+++ b/tests/providers/apache/sqoop/hooks/test_sqoop.py
@@ -40,6 +40,11 @@ class TestSqoopHook:
         "hcatalog_database": "hive_database",
         "hcatalog_table": "hive_table",
     }
+    _config_export_extra_options = {
+        "extra_options": collections.OrderedDict(
+            [("update-key", "id"), ("update-mode", "allowinsert"), 
("fetch-size", 1)]
+        ),
+    }
     _config_export = {
         "table": "export_data_to",
         "export_dir": "/hdfs/data/to/be/exported",
@@ -54,11 +59,15 @@ class TestSqoopHook:
         "input_optionally_enclosed_by": '"',
         "batch": True,
         "relaxed_isolation": True,
-        "extra_export_options": collections.OrderedDict(
-            [("update-key", "id"), ("update-mode", "allowinsert"), 
("fetch-size", 1)]
-        ),
         "schema": "domino",
     }
+    _config_import_extra_options = {
+        "extra_options": {
+            "hcatalog-storage-stanza": '"stored as orcfile"',
+            "show": "",
+            "fetch-size": 1,
+        },
+    }
     _config_import = {
         "target_dir": "/hdfs/data/target/location",
         "append": True,
@@ -66,11 +75,6 @@ class TestSqoopHook:
         "split_by": "\n",
         "direct": True,
         "driver": "com.microsoft.jdbc.sqlserver.SQLServerDriver",
-        "extra_import_options": {
-            "hcatalog-storage-stanza": '"stored as orcfile"',
-            "show": "",
-            "fetch-size": 1,
-        },
     }
 
     _config_json = {
@@ -111,6 +115,16 @@ class TestSqoopHook:
                 extra=None,
             )
         )
+        db.merge_conn(
+            Connection(
+                conn_id="invalid_schema_conn",
+                conn_type="mssql",
+                schema="schema?query_param1=value1",
+                host="rmdbs",
+                port=5050,
+                extra=None,
+            )
+        )
 
     @patch("subprocess.Popen")
     def test_popen(self, mock_popen):
@@ -126,7 +140,7 @@ class TestSqoopHook:
         mock_popen.return_value.__enter__.return_value = mock_proc
 
         # When
-        hook = SqoopHook(conn_id="sqoop_test", libjars="/path/to/jars")
+        hook = SqoopHook(conn_id="sqoop_test", libjars="/path/to/jars", 
**self._config_export_extra_options)
         hook.export_table(**self._config_export)
 
         # Then
@@ -172,7 +186,7 @@ class TestSqoopHook:
                 "--update-mode",
                 "allowinsert",
                 "--fetch-size",
-                
str(self._config_export["extra_export_options"].get("fetch-size")),
+                
str(self._config_export_extra_options["extra_options"].get("fetch-size")),
                 "--table",
                 self._config_export["table"],
                 "--",
@@ -243,7 +257,7 @@ class TestSqoopHook:
         """
         Tests to verify the hook export command is building correct Sqoop 
export command.
         """
-        hook = SqoopHook()
+        hook = SqoopHook(**self._config_export_extra_options)
 
         # The subprocess requires an array but we build the cmd by joining on 
a space
         cmd = " ".join(
@@ -261,7 +275,6 @@ class TestSqoopHook:
                 
input_optionally_enclosed_by=self._config_export["input_optionally_enclosed_by"],
                 batch=self._config_export["batch"],
                 relaxed_isolation=self._config_export["relaxed_isolation"],
-                
extra_export_options=self._config_export["extra_export_options"],
                 schema=self._config_export["schema"],
             )
         )
@@ -287,7 +300,7 @@ class TestSqoopHook:
         if self._config_export["relaxed_isolation"]:
             assert "--relaxed-isolation" in cmd
 
-        if self._config_export["extra_export_options"]:
+        if self._config_export_extra_options["extra_options"]:
             assert "--update-key" in cmd
             assert "--update-mode" in cmd
             assert "--fetch-size" in cmd
@@ -299,6 +312,7 @@ class TestSqoopHook:
         """
         Tests to verify the hook import command is building correct Sqoop 
import command.
         """
+        # Test hook without extra import options
         hook = SqoopHook()
 
         # The subprocess requires an array but we build the cmd by joining on 
a space
@@ -310,7 +324,6 @@ class TestSqoopHook:
                 split_by=self._config_import["split_by"],
                 direct=self._config_import["direct"],
                 driver=self._config_import["driver"],
-                extra_import_options=None,
             )
         )
 
@@ -328,6 +341,9 @@ class TestSqoopHook:
         assert "--show" not in cmd
         assert 'hcatalog-storage-stanza "stored as orcfile"' not in cmd
 
+        # Test hook with extra import options
+        hook = SqoopHook(**self._config_import_extra_options)
+
         cmd = " ".join(
             hook._import_cmd(
                 target_dir=None,
@@ -336,7 +352,6 @@ class TestSqoopHook:
                 split_by=self._config_import["split_by"],
                 direct=self._config_import["direct"],
                 driver=self._config_import["driver"],
-                
extra_import_options=self._config_import["extra_import_options"],
             )
         )
 
@@ -383,5 +398,10 @@ class TestSqoopHook:
 
     def test_invalid_host(self):
         hook = SqoopHook(conn_id="invalid_host_conn")
-        with pytest.raises(ValueError, match="host should not contain a"):
+        with pytest.raises(ValueError, match="should not contain a"):
+            hook._prepare_command()
+
+    def test_invalid_schema(self):
+        hook = SqoopHook(conn_id="invalid_schema_conn")
+        with pytest.raises(ValueError, match="should not contain a"):
             hook._prepare_command()
diff --git a/tests/providers/apache/sqoop/operators/test_sqoop.py 
b/tests/providers/apache/sqoop/operators/test_sqoop.py
index 883422bf20..ed3d0878c8 100644
--- a/tests/providers/apache/sqoop/operators/test_sqoop.py
+++ b/tests/providers/apache/sqoop/operators/test_sqoop.py
@@ -56,8 +56,7 @@ class TestSqoopOperator:
         "hcatalog_database": "hive_database",
         "hcatalog_table": "hive_table",
         "properties": {"mapred.map.max.attempts": "1"},
-        "extra_import_options": {"hcatalog-storage-stanza": '"stored as 
orcfile"', "show": ""},
-        "extra_export_options": {"update-key": "id", "update-mode": 
"allowinsert", "fetch-size": 1},
+        "extra_options": {"update-key": "id", "update-mode": "allowinsert", 
"fetch-size": 1},
         "schema": "myschema",
     }
 
@@ -92,8 +91,7 @@ class TestSqoopOperator:
         assert self._config["hcatalog_database"] == operator.hcatalog_database
         assert self._config["hcatalog_table"] == operator.hcatalog_table
         assert self._config["create_hcatalog_table"] == 
operator.create_hcatalog_table
-        assert self._config["extra_import_options"] == 
operator.extra_import_options
-        assert self._config["extra_export_options"] == 
operator.extra_export_options
+        assert self._config["extra_options"] == operator.extra_options
         assert self._config["schema"] == operator.schema
 
         # the following are meant to be more of examples
@@ -107,7 +105,7 @@ class TestSqoopOperator:
             hcatalog_database="default",
             hcatalog_table="import_table_1",
             create_hcatalog_table=True,
-            extra_import_options={"hcatalog-storage-stanza": '"stored as 
orcfile"'},
+            extra_options={"hcatalog-storage-stanza": '"stored as orcfile"'},
             dag=self.dag,
         )
 
@@ -123,7 +121,7 @@ class TestSqoopOperator:
             hcatalog_database="default",
             hcatalog_table="import_table_2",
             create_hcatalog_table=True,
-            extra_import_options={"hcatalog-storage-stanza": '"stored as 
orcfile"'},
+            extra_options={"hcatalog-storage-stanza": '"stored as orcfile"'},
             dag=self.dag,
         )
 
@@ -137,7 +135,7 @@ class TestSqoopOperator:
             hcatalog_database="default",
             hcatalog_table="import_table_3",
             create_hcatalog_table=True,
-            extra_import_options={
+            extra_options={
                 "hcatalog-storage-stanza": '"stored as orcfile"',
                 "hive-partition-key": "day",
                 "hive-partition-value": "2017-10-18",
@@ -155,7 +153,7 @@ class TestSqoopOperator:
             num_mappers=None,
             hcatalog_database="default",
             hcatalog_table="hive_export_table_1",
-            extra_export_options=None,
+            extra_options=None,
             dag=self.dag,
         )
 
@@ -168,7 +166,7 @@ class TestSqoopOperator:
             direct=True,  # speeds up for data transfer
             verbose=True,
             num_mappers=None,
-            extra_export_options=None,
+            extra_options=None,
             dag=self.dag,
         )
 

Reply via email to