This is an automated email from the ASF dual-hosted git repository.

johnbodley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git


The following commit(s) were added to refs/heads/master by this push:
     new b0f8f6b  fix(hive): Use parquet rather than textfile when uploading 
CSV files to Hive (#14240)
b0f8f6b is described below

commit b0f8f6b6ad0b59e10b843f1c0bb892900dcacb01
Author: John Bodley <[email protected]>
AuthorDate: Sat Apr 24 18:17:30 2021 +1200

    fix(hive): Use parquet rather than textfile when uploading CSV files to 
Hive (#14240)
    
    * fix(hive): Use parquet rather than textfile when uploading CSV files
    
    * [csv/excel]: Use stream rather than temporary file
    
    Co-authored-by: John Bodley <[email protected]>
---
 superset/db_engine_specs/base.py        |  83 ++++++-----------
 superset/db_engine_specs/bigquery.py    |  54 +++++++----
 superset/db_engine_specs/hive.py        | 160 ++++++++++++++------------------
 superset/views/database/views.py        | 140 ++++++++++++----------------
 tests/csv_upload_tests.py               |  33 +++----
 tests/db_engine_specs/bigquery_tests.py |  34 ++++---
 tests/db_engine_specs/hive_tests.py     | 122 +++++-------------------
 7 files changed, 243 insertions(+), 383 deletions(-)

diff --git a/superset/db_engine_specs/base.py b/superset/db_engine_specs/base.py
index 970f510..cfc3060 100644
--- a/superset/db_engine_specs/base.py
+++ b/superset/db_engine_specs/base.py
@@ -618,50 +618,41 @@ class BaseEngineSpec:  # pylint: 
disable=too-many-public-methods
         parsed_query = sql_parse.ParsedQuery(sql)
         return parsed_query.set_or_update_query_limit(limit)
 
-    @staticmethod
-    def csv_to_df(**kwargs: Any) -> pd.DataFrame:
-        """Read csv into Pandas DataFrame
-        :param kwargs: params to be passed to DataFrame.read_csv
-        :return: Pandas DataFrame containing data from csv
-        """
-        kwargs["encoding"] = "utf-8"
-        kwargs["iterator"] = True
-        chunks = pd.read_csv(**kwargs)
-        df = pd.concat(chunk for chunk in chunks)
-        return df
-
-    @classmethod
-    def df_to_sql(cls, df: pd.DataFrame, **kwargs: Any) -> None:
-        """Upload data from a Pandas DataFrame to a database. For
-        regular engines this calls the DataFrame.to_sql() method. Can be
-        overridden for engines that don't work well with to_sql(), e.g.
-        BigQuery.
-        :param df: Dataframe with data to be uploaded
-        :param kwargs: kwargs to be passed to to_sql() method
-        """
-        df.to_sql(**kwargs)
-
     @classmethod
-    def create_table_from_csv(  # pylint: disable=too-many-arguments
+    def df_to_sql(
         cls,
-        filename: str,
-        table: Table,
         database: "Database",
-        csv_to_df_kwargs: Dict[str, Any],
-        df_to_sql_kwargs: Dict[str, Any],
+        table: Table,
+        df: pd.DataFrame,
+        to_sql_kwargs: Dict[str, Any],
     ) -> None:
         """
-        Create table from contents of a csv. Note: this method does not create
-        metadata for the table.
+        Upload data from a Pandas DataFrame to a database.
+
+        For regular engines this calls the `pandas.DataFrame.to_sql` method. 
Can be
+        overridden for engines that don't work well with this method, e.g. 
Hive and
+        BigQuery.
+
+        Note this method does not create metadata for the table.
+
+        :param database: The database to upload the data to
+        :param table: The table to upload the data to
+        :param df: The dataframe with data to be uploaded
+        :param to_sql_kwargs: The kwargs to be passed to 
pandas.DataFrame.to_sql` method
         """
-        df = cls.csv_to_df(filepath_or_buffer=filename, **csv_to_df_kwargs)
+
         engine = cls.get_engine(database)
+        to_sql_kwargs["name"] = table.table
+
         if table.schema:
-            # only add schema when it is preset and non empty
-            df_to_sql_kwargs["schema"] = table.schema
+
+            # Only add schema when it is preset and non empty.
+            to_sql_kwargs["schema"] = table.schema
+
         if engine.dialect.supports_multivalues_insert:
-            df_to_sql_kwargs["method"] = "multi"
-        cls.df_to_sql(df=df, con=engine, **df_to_sql_kwargs)
+            to_sql_kwargs["method"] = "multi"
+
+        df.to_sql(con=engine, **to_sql_kwargs)
 
     @classmethod
     def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
@@ -675,28 +666,6 @@ class BaseEngineSpec:  # pylint: 
disable=too-many-public-methods
         return None
 
     @classmethod
-    def create_table_from_excel(  # pylint: disable=too-many-arguments
-        cls,
-        filename: str,
-        table: Table,
-        database: "Database",
-        excel_to_df_kwargs: Dict[str, Any],
-        df_to_sql_kwargs: Dict[str, Any],
-    ) -> None:
-        """
-        Create table from contents of a excel. Note: this method does not 
create
-        metadata for the table.
-        """
-        df = pd.read_excel(io=filename, **excel_to_df_kwargs)
-        engine = cls.get_engine(database)
-        if table.schema:
-            # only add schema when it is preset and non empty
-            df_to_sql_kwargs["schema"] = table.schema
-        if engine.dialect.supports_multivalues_insert:
-            df_to_sql_kwargs["method"] = "multi"
-        cls.df_to_sql(df=df, con=engine, **df_to_sql_kwargs)
-
-    @classmethod
     def get_all_datasource_names(
         cls, database: "Database", datasource_type: str
     ) -> List[utils.DatasourceName]:
diff --git a/superset/db_engine_specs/bigquery.py 
b/superset/db_engine_specs/bigquery.py
index 59f61d1..fd34bc9 100644
--- a/superset/db_engine_specs/bigquery.py
+++ b/superset/db_engine_specs/bigquery.py
@@ -26,6 +26,7 @@ from sqlalchemy.sql.expression import ColumnClause
 
 from superset.db_engine_specs.base import BaseEngineSpec
 from superset.errors import SupersetErrorType
+from superset.sql_parse import Table
 from superset.utils import core as utils
 
 if TYPE_CHECKING:
@@ -228,16 +229,26 @@ class BigQueryEngineSpec(BaseEngineSpec):
         return "TIMESTAMP_MILLIS({col})"
 
     @classmethod
-    def df_to_sql(cls, df: pd.DataFrame, **kwargs: Any) -> None:
+    def df_to_sql(
+        cls,
+        database: "Database",
+        table: Table,
+        df: pd.DataFrame,
+        to_sql_kwargs: Dict[str, Any],
+    ) -> None:
         """
-        Upload data from a Pandas DataFrame to BigQuery. Calls
-        `DataFrame.to_gbq()` which requires `pandas_gbq` to be installed.
+        Upload data from a Pandas DataFrame to a database.
 
-        :param df: Dataframe with data to be uploaded
-        :param kwargs: kwargs to be passed to to_gbq() method. Requires that 
`schema`,
-        `name` and `con` are present in kwargs. `name` and `schema` are 
combined
-         and passed to `to_gbq()` as `destination_table`.
+        Calls `pandas_gbq.DataFrame.to_gbq` which requires `pandas_gbq` to be 
installed.
+
+        Note this method does not create metadata for the table.
+
+        :param database: The database to upload the data to
+        :param table: The table to upload the data to
+        :param df: The dataframe with data to be uploaded
+        :param to_sql_kwargs: The kwargs to be passed to 
pandas.DataFrame.to_sql` method
         """
+
         try:
             import pandas_gbq
             from google.oauth2 import service_account
@@ -248,22 +259,25 @@ class BigQueryEngineSpec(BaseEngineSpec):
                 "to upload data to BigQuery"
             )
 
-        if not ("name" in kwargs and "schema" in kwargs and "con" in kwargs):
-            raise Exception("name, schema and con need to be defined in 
kwargs")
+        if not table.schema:
+            raise Exception("The table schema must be defined")
 
-        gbq_kwargs = {}
-        gbq_kwargs["project_id"] = kwargs["con"].engine.url.host
-        gbq_kwargs["destination_table"] = 
f"{kwargs.pop('schema')}.{kwargs.pop('name')}"
+        engine = cls.get_engine(database)
+        to_gbq_kwargs = {"destination_table": str(table), "project_id": 
engine.url.host}
+
+        # Add credentials if they are set on the SQLAlchemy dialect.
+        creds = engine.dialect.credentials_info
 
-        # add credentials if they are set on the SQLAlchemy Dialect:
-        creds = kwargs["con"].dialect.credentials_info
         if creds:
-            credentials = 
service_account.Credentials.from_service_account_info(creds)
-            gbq_kwargs["credentials"] = credentials
+            to_gbq_kwargs[
+                "credentials"
+            ] = service_account.Credentials.from_service_account_info(creds)
 
-        # Only pass through supported kwargs
+        # Only pass through supported kwargs.
         supported_kwarg_keys = {"if_exists"}
+
         for key in supported_kwarg_keys:
-            if key in kwargs:
-                gbq_kwargs[key] = kwargs[key]
-        pandas_gbq.to_gbq(df, **gbq_kwargs)
+            if key in to_sql_kwargs:
+                to_gbq_kwargs[key] = to_sql_kwargs[key]
+
+        pandas_gbq.to_gbq(df, **to_gbq_kwargs)
diff --git a/superset/db_engine_specs/hive.py b/superset/db_engine_specs/hive.py
index 9ceb1e5..6996beb 100644
--- a/superset/db_engine_specs/hive.py
+++ b/superset/db_engine_specs/hive.py
@@ -17,12 +17,16 @@
 import logging
 import os
 import re
+import tempfile
 import time
 from datetime import datetime
 from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING
 from urllib import parse
 
+import numpy as np
 import pandas as pd
+import pyarrow as pa
+import pyarrow.parquet as pq
 from flask import g
 from sqlalchemy import Column, text
 from sqlalchemy.engine.base import Engine
@@ -54,6 +58,15 @@ hive_poll_interval = conf.get("HIVE_POLL_INTERVAL")
 
 
 def upload_to_s3(filename: str, upload_prefix: str, table: Table) -> str:
+    """
+    Upload the file to S3.
+
+    :param filename: The file to upload
+    :param upload_prefix: The S3 prefix
+    :param table: The table that will be created
+    :returns: The S3 location of the table
+    """
+
     # Optional dependency
     import boto3  # pylint: disable=import-error
 
@@ -156,89 +169,37 @@ class HiveEngineSpec(PrestoEngineSpec):
             return []
 
     @classmethod
-    def get_create_table_stmt(  # pylint: disable=too-many-arguments
-        cls,
-        table: Table,
-        schema_definition: str,
-        location: str,
-        delim: str,
-        header_line_count: Optional[int],
-        null_values: Optional[List[str]],
-    ) -> text:
-        tblproperties = []
-        # available options:
-        # https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
-        # TODO(bkyryliuk): figure out what to do with the skip rows field.
-        params: Dict[str, str] = {
-            "delim": delim,
-            "location": location,
-        }
-        if header_line_count is not None and header_line_count >= 0:
-            header_line_count += 1
-            tblproperties.append("'skip.header.line.count'=:header_line_count")
-            params["header_line_count"] = str(header_line_count)
-        if null_values:
-            # hive only supports 1 value for the null format
-            tblproperties.append("'serialization.null.format'=:null_value")
-            params["null_value"] = null_values[0]
-
-        if tblproperties:
-            tblproperties_stmt = f"tblproperties ({', '.join(tblproperties)})"
-            sql = f"""CREATE TABLE {str(table)} ( {schema_definition} )
-                ROW FORMAT DELIMITED FIELDS TERMINATED BY :delim
-                STORED AS TEXTFILE LOCATION :location
-                {tblproperties_stmt}"""
-        else:
-            sql = f"""CREATE TABLE {str(table)} ( {schema_definition} )
-                ROW FORMAT DELIMITED FIELDS TERMINATED BY :delim
-                STORED AS TEXTFILE LOCATION :location"""
-        return sql, params
-
-    @classmethod
-    def create_table_from_csv(  # pylint: disable=too-many-arguments, 
too-many-locals
+    def df_to_sql(
         cls,
-        filename: str,
-        table: Table,
         database: "Database",
-        csv_to_df_kwargs: Dict[str, Any],
-        df_to_sql_kwargs: Dict[str, Any],
+        table: Table,
+        df: pd.DataFrame,
+        to_sql_kwargs: Dict[str, Any],
     ) -> None:
-        """Uploads a csv file and creates a superset datasource in Hive."""
-        if_exists = df_to_sql_kwargs["if_exists"]
-        if if_exists == "append":
-            raise SupersetException("Append operation not currently supported")
+        """
+        Upload data from a Pandas DataFrame to a database.
 
-        def convert_to_hive_type(col_type: str) -> str:
-            """maps tableschema's types to hive types"""
-            tableschema_to_hive_types = {
-                "boolean": "BOOLEAN",
-                "integer": "BIGINT",
-                "number": "DOUBLE",
-                "string": "STRING",
-            }
-            return tableschema_to_hive_types.get(col_type, "STRING")
+        The data is stored via the binary Parquet format which is both less 
problematic
+        and more performant than a text file. More specifically storing a 
table as a
+        CSV text file has severe limitations including the fact that the Hive 
CSV SerDe
+        does not support multiline fields.
 
-        upload_prefix = config["CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC"](
-            database, g.user, table.schema
-        )
+        Note this method does not create metadata for the table.
 
-        # Optional dependency
-        from tableschema import (  # pylint: disable=import-error
-            Table as TableSchemaTable,
-        )
+        :param database: The database to upload the data to
+        :param: table The table to upload the data to
+        :param df: The dataframe with data to be uploaded
+        :param to_sql_kwargs: The kwargs to be passed to 
pandas.DataFrame.to_sql` method
+        """
 
-        hive_table_schema = TableSchemaTable(filename).infer()
-        column_name_and_type = []
-        for column_info in hive_table_schema["fields"]:
-            column_name_and_type.append(
-                "`{}` {}".format(
-                    column_info["name"], 
convert_to_hive_type(column_info["type"])
-                )
-            )
-        schema_definition = ", ".join(column_name_and_type)
+        engine = cls.get_engine(database)
+
+        if to_sql_kwargs["if_exists"] == "append":
+            raise SupersetException("Append operation not currently supported")
 
-        # ensure table doesn't already exist
-        if if_exists == "fail":
+        if to_sql_kwargs["if_exists"] == "fail":
+
+            # Ensure table doesn't already exist.
             if table.schema:
                 table_exists = not database.get_df(
                     f"SHOW TABLES IN {table.schema} LIKE '{table.table}'"
@@ -247,24 +208,47 @@ class HiveEngineSpec(PrestoEngineSpec):
                 table_exists = not database.get_df(
                     f"SHOW TABLES LIKE '{table.table}'"
                 ).empty
+
             if table_exists:
                 raise SupersetException("Table already exists")
+        elif to_sql_kwargs["if_exists"] == "replace":
+            engine.execute(f"DROP TABLE IF EXISTS {str(table)}")
 
-        engine = cls.get_engine(database)
+        def _get_hive_type(dtype: np.dtype) -> str:
+            hive_type_by_dtype = {
+                np.dtype("bool"): "BOOLEAN",
+                np.dtype("float64"): "DOUBLE",
+                np.dtype("int64"): "BIGINT",
+                np.dtype("object"): "STRING",
+            }
 
-        if if_exists == "replace":
-            engine.execute(f"DROP TABLE IF EXISTS {str(table)}")
-        location = upload_to_s3(filename, upload_prefix, table)
-        sql, params = cls.get_create_table_stmt(
-            table,
-            schema_definition,
-            location,
-            csv_to_df_kwargs["sep"].encode().decode("unicode_escape"),
-            int(csv_to_df_kwargs.get("header", 0)),
-            csv_to_df_kwargs.get("na_values"),
+            return hive_type_by_dtype.get(dtype, "STRING")
+
+        schema_definition = ", ".join(
+            f"`{name}` {_get_hive_type(dtype)}" for name, dtype in 
df.dtypes.items()
         )
-        engine = cls.get_engine(database)
-        engine.execute(text(sql), **params)
+
+        with tempfile.NamedTemporaryFile(
+            dir=config["UPLOAD_FOLDER"], suffix=".parquet"
+        ) as file:
+            pq.write_table(pa.Table.from_pandas(df), where=file.name)
+
+            engine.execute(
+                text(
+                    f"""
+                    CREATE TABLE {str(table)} ({schema_definition})
+                    STORED AS PARQUET
+                    LOCATION :location
+                    """
+                ),
+                location=upload_to_s3(
+                    filename=file.name,
+                    upload_prefix=config["CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC"](
+                        database, g.user, table.schema
+                    ),
+                    table=table,
+                ),
+            )
 
     @classmethod
     def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
diff --git a/superset/views/database/views.py b/superset/views/database/views.py
index 3a68f32..e3c3f92 100644
--- a/superset/views/database/views.py
+++ b/superset/views/database/views.py
@@ -18,6 +18,7 @@ import os
 import tempfile
 from typing import TYPE_CHECKING
 
+import pandas as pd
 from flask import flash, g, redirect
 from flask_appbuilder import expose, SimpleFormView
 from flask_appbuilder.models.sqla.interface import SQLAInterface
@@ -149,55 +150,44 @@ class CsvToDatabaseView(SimpleFormView):
             flash(message, "danger")
             return redirect("/csvtodatabaseview/form")
 
-        uploaded_tmp_file_path = tempfile.NamedTemporaryFile(
-            dir=app.config["UPLOAD_FOLDER"],
-            suffix=os.path.splitext(form.csv_file.data.filename)[1].lower(),
-            delete=False,
-        ).name
-
         try:
-            utils.ensure_path_exists(config["UPLOAD_FOLDER"])
-            upload_stream_write(form.csv_file.data, uploaded_tmp_file_path)
+            df = pd.concat(
+                pd.read_csv(
+                    chunksize=1000,
+                    encoding="utf-8",
+                    filepath_or_buffer=form.csv_file.data,
+                    header=form.header.data if form.header.data else 0,
+                    index_col=form.index_col.data,
+                    infer_datetime_format=form.infer_datetime_format.data,
+                    iterator=True,
+                    keep_default_na=not form.null_values.data,
+                    mangle_dupe_cols=form.mangle_dupe_cols.data,
+                    na_values=form.null_values.data if form.null_values.data 
else None,
+                    nrows=form.nrows.data,
+                    parse_dates=form.parse_dates.data,
+                    sep=form.sep.data,
+                    skip_blank_lines=form.skip_blank_lines.data,
+                    skipinitialspace=form.skipinitialspace.data,
+                    skiprows=form.skiprows.data,
+                )
+            )
 
-            con = form.data.get("con")
             database = (
-                
db.session.query(models.Database).filter_by(id=con.data.get("id")).one()
+                db.session.query(models.Database)
+                .filter_by(id=form.data.get("con").data.get("id"))
+                .one()
             )
 
-            # More can be found here:
-            # 
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
-            csv_to_df_kwargs = {
-                "sep": form.sep.data,
-                "header": form.header.data if form.header.data else 0,
-                "index_col": form.index_col.data,
-                "mangle_dupe_cols": form.mangle_dupe_cols.data,
-                "skipinitialspace": form.skipinitialspace.data,
-                "skiprows": form.skiprows.data,
-                "nrows": form.nrows.data,
-                "skip_blank_lines": form.skip_blank_lines.data,
-                "parse_dates": form.parse_dates.data,
-                "infer_datetime_format": form.infer_datetime_format.data,
-                "chunksize": 1000,
-            }
-            if form.null_values.data:
-                csv_to_df_kwargs["na_values"] = form.null_values.data
-                csv_to_df_kwargs["keep_default_na"] = False
-
-            # More can be found here:
-            # 
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_sql.html
-            df_to_sql_kwargs = {
-                "name": csv_table.table,
-                "if_exists": form.if_exists.data,
-                "index": form.index.data,
-                "index_label": form.index_label.data,
-                "chunksize": 1000,
-            }
-            database.db_engine_spec.create_table_from_csv(
-                uploaded_tmp_file_path,
-                csv_table,
+            database.db_engine_spec.df_to_sql(
                 database,
-                csv_to_df_kwargs,
-                df_to_sql_kwargs,
+                csv_table,
+                df,
+                to_sql_kwargs={
+                    "chunksize": 1000,
+                    "if_exists": form.if_exists.data,
+                    "index": form.index.data,
+                    "index_label": form.index_label.data,
+                },
             )
 
             # Connect table to the database that should be used for 
exploration.
@@ -236,10 +226,6 @@ class CsvToDatabaseView(SimpleFormView):
             db.session.commit()
         except Exception as ex:  # pylint: disable=broad-except
             db.session.rollback()
-            try:
-                os.remove(uploaded_tmp_file_path)
-            except OSError:
-                pass
             message = _(
                 'Unable to upload CSV file "%(filename)s" to table '
                 '"%(table_name)s" in database "%(db_name)s". '
@@ -254,7 +240,6 @@ class CsvToDatabaseView(SimpleFormView):
             stats_logger.incr("failed_csv_upload")
             return redirect("/csvtodatabaseview/form")
 
-        os.remove(uploaded_tmp_file_path)
         # Go back to welcome page / splash screen
         message = _(
             'CSV file "%(csv_filename)s" uploaded to table "%(table_name)s" in 
'
@@ -316,40 +301,34 @@ class ExcelToDatabaseView(SimpleFormView):
             utils.ensure_path_exists(config["UPLOAD_FOLDER"])
             upload_stream_write(form.excel_file.data, uploaded_tmp_file_path)
 
-            con = form.data.get("con")
+            df = pd.read_excel(
+                header=form.header.data if form.header.data else 0,
+                index_col=form.index_col.data,
+                io=form.excel_file.data,
+                keep_default_na=not form.null_values.data,
+                mangle_dupe_cols=form.mangle_dupe_cols.data,
+                na_values=form.null_values.data if form.null_values.data else 
None,
+                parse_dates=form.parse_dates.data,
+                skiprows=form.skiprows.data,
+                sheet_name=form.sheet_name.data if form.sheet_name.data else 0,
+            )
+
             database = (
-                
db.session.query(models.Database).filter_by(id=con.data.get("id")).one()
+                db.session.query(models.Database)
+                .filter_by(id=form.data.get("con").data.get("id"))
+                .one()
             )
 
-            # some params are not supported by pandas.read_excel (e.g. 
chunksize).
-            # More can be found here:
-            # 
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_excel.html
-            excel_to_df_kwargs = {
-                "header": form.header.data if form.header.data else 0,
-                "index_col": form.index_col.data,
-                "mangle_dupe_cols": form.mangle_dupe_cols.data,
-                "skiprows": form.skiprows.data,
-                "nrows": form.nrows.data,
-                "sheet_name": form.sheet_name.data if form.sheet_name.data 
else 0,
-                "parse_dates": form.parse_dates.data,
-            }
-            if form.null_values.data:
-                excel_to_df_kwargs["na_values"] = form.null_values.data
-                excel_to_df_kwargs["keep_default_na"] = False
-
-            df_to_sql_kwargs = {
-                "name": excel_table.table,
-                "if_exists": form.if_exists.data,
-                "index": form.index.data,
-                "index_label": form.index_label.data,
-                "chunksize": 1000,
-            }
-            database.db_engine_spec.create_table_from_excel(
-                uploaded_tmp_file_path,
-                excel_table,
+            database.db_engine_spec.df_to_sql(
                 database,
-                excel_to_df_kwargs,
-                df_to_sql_kwargs,
+                excel_table,
+                df,
+                to_sql_kwargs={
+                    "chunksize": 1000,
+                    "if_exists": form.if_exists.data,
+                    "index": form.index.data,
+                    "index_label": form.index_label.data,
+                },
             )
 
             # Connect table to the database that should be used for 
exploration.
@@ -388,10 +367,6 @@ class ExcelToDatabaseView(SimpleFormView):
             db.session.commit()
         except Exception as ex:  # pylint: disable=broad-except
             db.session.rollback()
-            try:
-                os.remove(uploaded_tmp_file_path)
-            except OSError:
-                pass
             message = _(
                 'Unable to upload Excel file "%(filename)s" to table '
                 '"%(table_name)s" in database "%(db_name)s". '
@@ -406,7 +381,6 @@ class ExcelToDatabaseView(SimpleFormView):
             stats_logger.incr("failed_excel_upload")
             return redirect("/exceltodatabaseview/form")
 
-        os.remove(uploaded_tmp_file_path)
         # Go back to welcome page / splash screen
         message = _(
             'Excel file "%(excel_filename)s" uploaded to table 
"%(table_name)s" in '
diff --git a/tests/csv_upload_tests.py b/tests/csv_upload_tests.py
index 229a74f..c12ff44 100644
--- a/tests/csv_upload_tests.py
+++ b/tests/csv_upload_tests.py
@@ -134,13 +134,14 @@ def upload_excel(
     return get_resp(test_client, "/exceltodatabaseview/form", data=form_data)
 
 
-def mock_upload_to_s3(f: str, p: str, t: Table) -> str:
-    """ HDFS is used instead of S3 for the unit tests.
+def mock_upload_to_s3(filename: str, upload_prefix: str, table: Table) -> str:
+    """
+    HDFS is used instead of S3 for the unit tests.
 
-    :param f: filepath
-    :param p: unused parameter
-    :param t: table that will be created
-    :return: hdfs path to the directory with external table files
+    :param filename: The file to upload
+    :param upload_prefix: The S3 prefix
+    :param table: The table that will be created
+    :returns: The HDFS path to the directory with external table files
     """
     # only needed for the hive tests
     import docker
@@ -148,11 +149,11 @@ def mock_upload_to_s3(f: str, p: str, t: Table) -> str:
     client = docker.from_env()
     container = client.containers.get("namenode")
     # docker mounted volume that contains csv uploads
-    src = os.path.join("/tmp/superset_uploads", os.path.basename(f))
+    src = os.path.join("/tmp/superset_uploads", os.path.basename(filename))
     # hdfs destination for the external tables
-    dest_dir = os.path.join("/tmp/external/superset_uploads/", str(t))
+    dest_dir = os.path.join("/tmp/external/superset_uploads/", str(table))
     container.exec_run(f"hdfs dfs -mkdir -p {dest_dir}")
-    dest = os.path.join(dest_dir, os.path.basename(f))
+    dest = os.path.join(dest_dir, os.path.basename(filename))
     container.exec_run(f"hdfs dfs -put {src} {dest}")
     # hive external table expectes a directory for the location
     return dest_dir
@@ -279,23 +280,13 @@ def test_import_csv(setup_csv_upload, create_csv_files):
     # make sure that john and empty string are replaced with None
     engine = get_upload_db().get_sqla_engine()
     data = engine.execute(f"SELECT * from {CSV_UPLOAD_TABLE}").fetchall()
-    if utils.backend() == "hive":
-        # Be aware that hive only uses first value from the null values list.
-        # It is hive database engine limitation.
-        # TODO(bkyryliuk): preprocess csv file for hive upload to match 
default engine capabilities.
-        assert data == [("john", 1, "x"), ("paul", 2, None)]
-    else:
-        assert data == [(None, 1, "x"), ("paul", 2, None)]
+    assert data == [(None, 1, "x"), ("paul", 2, None)]
 
     # default null values
     upload_csv(CSV_FILENAME2, CSV_UPLOAD_TABLE, extra={"if_exists": "replace"})
     # make sure that john and empty string are replaced with None
     data = engine.execute(f"SELECT * from {CSV_UPLOAD_TABLE}").fetchall()
-    if utils.backend() == "hive":
-        # By default hive does not convert values to null vs other databases.
-        assert data == [("john", 1, "x"), ("paul", 2, "")]
-    else:
-        assert data == [("john", 1, "x"), ("paul", 2, None)]
+    assert data == [("john", 1, "x"), ("paul", 2, None)]
 
 
 @mock.patch("superset.db_engine_specs.hive.upload_to_s3", mock_upload_to_s3)
diff --git a/tests/db_engine_specs/bigquery_tests.py 
b/tests/db_engine_specs/bigquery_tests.py
index d015007..81a9f06 100644
--- a/tests/db_engine_specs/bigquery_tests.py
+++ b/tests/db_engine_specs/bigquery_tests.py
@@ -23,6 +23,7 @@ from sqlalchemy import column
 from superset.db_engine_specs.base import BaseEngineSpec
 from superset.db_engine_specs.bigquery import BigQueryEngineSpec
 from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
+from superset.sql_parse import Table
 from tests.db_engine_specs.base_tests import TestDbEngineSpec
 
 
@@ -166,21 +167,23 @@ class TestBigQueryDbEngineSpec(TestDbEngineSpec):
             [{"name": "partition", "column_names": ["dttm"], "unique": False}],
         )
 
-    def test_df_to_sql(self):
+    
@mock.patch("superset.db_engine_specs.bigquery.BigQueryEngineSpec.get_engine")
+    def test_df_to_sql(self, mock_get_engine):
         """
         DB Eng Specs (bigquery): Test DataFrame to SQL contract
         """
         # test missing google.oauth2 dependency
         sys.modules["pandas_gbq"] = mock.MagicMock()
         df = DataFrame()
+        database = mock.MagicMock()
         self.assertRaisesRegexp(
             Exception,
             "Could not import libraries",
             BigQueryEngineSpec.df_to_sql,
-            df,
-            con="some_connection",
-            schema="schema",
-            name="name",
+            database=database,
+            table=Table(table="name", schema="schema"),
+            df=df,
+            to_sql_kwargs={},
         )
 
         invalid_kwargs = [
@@ -191,15 +194,17 @@ class TestBigQueryDbEngineSpec(TestDbEngineSpec):
             {"name": "some_name", "schema": "some_schema"},
             {"con": "some_con", "schema": "some_schema"},
         ]
-        # Test check for missing required kwargs (name, schema, con)
+        # Test check for missing schema.
         sys.modules["google.oauth2"] = mock.MagicMock()
         for invalid_kwarg in invalid_kwargs:
             self.assertRaisesRegexp(
                 Exception,
-                "name, schema and con need to be defined in kwargs",
+                "The table schema must be defined",
                 BigQueryEngineSpec.df_to_sql,
-                df,
-                **invalid_kwarg,
+                database=database,
+                table=Table(table="name"),
+                df=df,
+                to_sql_kwargs=invalid_kwarg,
             )
 
         import pandas_gbq
@@ -209,12 +214,15 @@ class TestBigQueryDbEngineSpec(TestDbEngineSpec):
         service_account.Credentials.from_service_account_info = mock.MagicMock(
             return_value="account_info"
         )
-        connection = mock.Mock()
-        connection.engine.url.host = "google-host"
-        connection.dialect.credentials_info = "secrets"
+
+        mock_get_engine.return_value.url.host = "google-host"
+        mock_get_engine.return_value.dialect.credentials_info = "secrets"
 
         BigQueryEngineSpec.df_to_sql(
-            df, con=connection, schema="schema", name="name", 
if_exists="extra_key"
+            database=database,
+            table=Table(table="name", schema="schema"),
+            df=df,
+            to_sql_kwargs={"if_exists": "extra_key"},
         )
 
         pandas_gbq.to_gbq.assert_called_with(
diff --git a/tests/db_engine_specs/hive_tests.py 
b/tests/db_engine_specs/hive_tests.py
index 51306dc..4d50518 100644
--- a/tests/db_engine_specs/hive_tests.py
+++ b/tests/db_engine_specs/hive_tests.py
@@ -163,11 +163,10 @@ def test_convert_dttm():
     )
 
 
-def test_create_table_from_csv_append() -> None:
-
+def test_df_to_csv() -> None:
     with pytest.raises(SupersetException):
-        HiveEngineSpec.create_table_from_csv(
-            "foo.csv", Table("foobar"), mock.MagicMock(), {}, {"if_exists": 
"append"}
+        HiveEngineSpec.df_to_sql(
+            mock.MagicMock(), Table("foobar"), pd.DataFrame(), {"if_exists": 
"append"},
         )
 
 
@@ -176,15 +175,13 @@ def test_create_table_from_csv_append() -> None:
     {**app.config, "CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC": lambda *args: ""},
 )
 @mock.patch("superset.db_engine_specs.hive.g", spec={})
[email protected]("tableschema.Table")
-def test_create_table_from_csv_if_exists_fail(mock_table, mock_g):
-    mock_table.infer.return_value = {}
+def test_df_to_sql_if_exists_fail(mock_g):
     mock_g.user = True
     mock_database = mock.MagicMock()
     mock_database.get_df.return_value.empty = False
     with pytest.raises(SupersetException, match="Table already exists"):
-        HiveEngineSpec.create_table_from_csv(
-            "foo.csv", Table("foobar"), mock_database, {}, {"if_exists": 
"fail"}
+        HiveEngineSpec.df_to_sql(
+            mock_database, Table("foobar"), pd.DataFrame(), {"if_exists": 
"fail"}
         )
 
 
@@ -193,18 +190,15 @@ def test_create_table_from_csv_if_exists_fail(mock_table, 
mock_g):
     {**app.config, "CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC": lambda *args: ""},
 )
 @mock.patch("superset.db_engine_specs.hive.g", spec={})
[email protected]("tableschema.Table")
-def test_create_table_from_csv_if_exists_fail_with_schema(mock_table, mock_g):
-    mock_table.infer.return_value = {}
+def test_df_to_sql_if_exists_fail_with_schema(mock_g):
     mock_g.user = True
     mock_database = mock.MagicMock()
     mock_database.get_df.return_value.empty = False
     with pytest.raises(SupersetException, match="Table already exists"):
-        HiveEngineSpec.create_table_from_csv(
-            "foo.csv",
-            Table(table="foobar", schema="schema"),
+        HiveEngineSpec.df_to_sql(
             mock_database,
-            {},
+            Table(table="foobar", schema="schema"),
+            pd.DataFrame(),
             {"if_exists": "fail"},
         )
 
@@ -214,11 +208,9 @@ def 
test_create_table_from_csv_if_exists_fail_with_schema(mock_table, mock_g):
     {**app.config, "CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC": lambda *args: ""},
 )
 @mock.patch("superset.db_engine_specs.hive.g", spec={})
[email protected]("tableschema.Table")
 @mock.patch("superset.db_engine_specs.hive.upload_to_s3")
-def test_create_table_from_csv_if_exists_replace(mock_upload_to_s3, 
mock_table, mock_g):
+def test_df_to_sql_if_exists_replace(mock_upload_to_s3, mock_g):
     mock_upload_to_s3.return_value = "mock-location"
-    mock_table.infer.return_value = {}
     mock_g.user = True
     mock_database = mock.MagicMock()
     mock_database.get_df.return_value.empty = False
@@ -226,12 +218,11 @@ def 
test_create_table_from_csv_if_exists_replace(mock_upload_to_s3, mock_table,
     mock_database.get_sqla_engine.return_value.execute = mock_execute
     table_name = "foobar"
 
-    HiveEngineSpec.create_table_from_csv(
-        "foo.csv",
-        Table(table=table_name),
+    HiveEngineSpec.df_to_sql(
         mock_database,
-        {"sep": "mock", "header": 1, "na_values": "mock"},
-        {"if_exists": "replace"},
+        Table(table=table_name),
+        pd.DataFrame(),
+        {"if_exists": "replace", "header": 1, "na_values": "mock", "sep": 
"mock"},
     )
 
     mock_execute.assert_any_call(f"DROP TABLE IF EXISTS {table_name}")
@@ -242,13 +233,9 @@ def 
test_create_table_from_csv_if_exists_replace(mock_upload_to_s3, mock_table,
     {**app.config, "CSV_TO_HIVE_UPLOAD_DIRECTORY_FUNC": lambda *args: ""},
 )
 @mock.patch("superset.db_engine_specs.hive.g", spec={})
[email protected]("tableschema.Table")
 @mock.patch("superset.db_engine_specs.hive.upload_to_s3")
-def test_create_table_from_csv_if_exists_replace_with_schema(
-    mock_upload_to_s3, mock_table, mock_g
-):
+def test_df_to_sql_if_exists_replace_with_schema(mock_upload_to_s3, mock_g):
     mock_upload_to_s3.return_value = "mock-location"
-    mock_table.infer.return_value = {}
     mock_g.user = True
     mock_database = mock.MagicMock()
     mock_database.get_df.return_value.empty = False
@@ -256,84 +243,17 @@ def 
test_create_table_from_csv_if_exists_replace_with_schema(
     mock_database.get_sqla_engine.return_value.execute = mock_execute
     table_name = "foobar"
     schema = "schema"
-    HiveEngineSpec.create_table_from_csv(
-        "foo.csv",
-        Table(table=table_name, schema=schema),
+
+    HiveEngineSpec.df_to_sql(
         mock_database,
-        {"sep": "mock", "header": 1, "na_values": "mock"},
-        {"if_exists": "replace"},
+        Table(table=table_name, schema=schema),
+        pd.DataFrame(),
+        {"if_exists": "replace", "header": 1, "na_values": "mock", "sep": 
"mock"},
     )
 
     mock_execute.assert_any_call(f"DROP TABLE IF EXISTS {schema}.{table_name}")
 
 
-def test_get_create_table_stmt() -> None:
-    table = Table("employee")
-    schema_def = """eid int, name String, salary String, destination String"""
-    location = "s3a://directory/table"
-    from unittest import TestCase
-
-    assert HiveEngineSpec.get_create_table_stmt(
-        table, schema_def, location, ",", 0, [""]
-    ) == (
-        """CREATE TABLE employee ( eid int, name String, salary String, 
destination String )
-                ROW FORMAT DELIMITED FIELDS TERMINATED BY :delim
-                STORED AS TEXTFILE LOCATION :location
-                tblproperties ('skip.header.line.count'=:header_line_count, 
'serialization.null.format'=:null_value)""",
-        {
-            "delim": ",",
-            "location": "s3a://directory/table",
-            "header_line_count": "1",
-            "null_value": "",
-        },
-    )
-    assert HiveEngineSpec.get_create_table_stmt(
-        table, schema_def, location, ",", 1, ["1", "2"]
-    ) == (
-        """CREATE TABLE employee ( eid int, name String, salary String, 
destination String )
-                ROW FORMAT DELIMITED FIELDS TERMINATED BY :delim
-                STORED AS TEXTFILE LOCATION :location
-                tblproperties ('skip.header.line.count'=:header_line_count, 
'serialization.null.format'=:null_value)""",
-        {
-            "delim": ",",
-            "location": "s3a://directory/table",
-            "header_line_count": "2",
-            "null_value": "1",
-        },
-    )
-    assert HiveEngineSpec.get_create_table_stmt(
-        table, schema_def, location, ",", 100, ["NaN"]
-    ) == (
-        """CREATE TABLE employee ( eid int, name String, salary String, 
destination String )
-                ROW FORMAT DELIMITED FIELDS TERMINATED BY :delim
-                STORED AS TEXTFILE LOCATION :location
-                tblproperties ('skip.header.line.count'=:header_line_count, 
'serialization.null.format'=:null_value)""",
-        {
-            "delim": ",",
-            "location": "s3a://directory/table",
-            "header_line_count": "101",
-            "null_value": "NaN",
-        },
-    )
-    assert HiveEngineSpec.get_create_table_stmt(
-        table, schema_def, location, ",", None, None
-    ) == (
-        """CREATE TABLE employee ( eid int, name String, salary String, 
destination String )
-                ROW FORMAT DELIMITED FIELDS TERMINATED BY :delim
-                STORED AS TEXTFILE LOCATION :location""",
-        {"delim": ",", "location": "s3a://directory/table"},
-    )
-    assert HiveEngineSpec.get_create_table_stmt(
-        table, schema_def, location, ",", 100, []
-    ) == (
-        """CREATE TABLE employee ( eid int, name String, salary String, 
destination String )
-                ROW FORMAT DELIMITED FIELDS TERMINATED BY :delim
-                STORED AS TEXTFILE LOCATION :location
-                tblproperties ('skip.header.line.count'=:header_line_count)""",
-        {"delim": ",", "location": "s3a://directory/table", 
"header_line_count": "101"},
-    )
-
-
 def test_is_readonly():
     def is_readonly(sql: str) -> bool:
         return HiveEngineSpec.is_readonly_query(ParsedQuery(sql))

Reply via email to