This is an automated email from the ASF dual-hosted git repository.

maximebeauchemin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git


The following commit(s) were added to refs/heads/master by this push:
     new 268edcf  Import CSV (#3643)
268edcf is described below

commit 268edcfedd0eae0dcb33bdd1e14795f602ab4532
Author: timifasubaa <[email protected]>
AuthorDate: Mon Nov 27 21:07:12 2017 -0800

    Import CSV (#3643)
    
    * add upload csv button to sources dropdown
    
    * upload csv to non-hive datasources
    
    * upload csv to hive datasource
    
    * update FAQ page
    
    * add tests
    
    * fix linting errors and merge conflicts
    
    * Update .travis.yml
    
    * Update tox.ini
---
 docs/faq.rst                |   7 +++
 superset/config.py          |  16 +++++-
 superset/db_engine_specs.py | 109 ++++++++++++++++++++++++++++++++++++++-
 superset/forms.py           | 123 ++++++++++++++++++++++++++++++++++++++++++++
 superset/views/core.py      |  81 ++++++++++++++++++++++++++++-
 tests/core_tests.py         |  42 +++++++++++++++
 tox.ini                     |   2 +-
 7 files changed, 375 insertions(+), 5 deletions(-)

diff --git a/docs/faq.rst b/docs/faq.rst
index 0ca341e..b6520b9 100644
--- a/docs/faq.rst
+++ b/docs/faq.rst
@@ -45,6 +45,13 @@ visualizations.
 https://github.com/airbnb/superset/issues?q=label%3Aexample+is%3Aclosed
 
 
+Can I upload and visualize csv data?
+-------------------------------------
+
+Yes, using the ``Upload a CSV`` button under the ``Sources``
+menu item. This brings up a form that allows you specify required information. 
After creating the table from CSV, it can then be loadede like any other on the 
``Sources -> Tables``page.
+
+
 Why are my queries timing out?
 ------------------------------
 
diff --git a/superset/config.py b/superset/config.py
index ebc4777..0e0bd9b 100644
--- a/superset/config.py
+++ b/superset/config.py
@@ -58,9 +58,9 @@ SQLALCHEMY_TRACK_MODIFICATIONS = False
 SECRET_KEY = '\2\1thisismyscretkey\1\2\e\y\y\h'  # noqa
 
 # The SQLAlchemy connection string.
-SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(DATA_DIR, 'superset.db')
+# SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(DATA_DIR, 
'superset.db')
 # SQLALCHEMY_DATABASE_URI = 'mysql://myapp@localhost/myapp'
-# SQLALCHEMY_DATABASE_URI = 'postgresql://root:password@localhost/myapp'
+SQLALCHEMY_DATABASE_URI = 'postgresql://root:password@localhost/myapp'
 
 # In order to hook up a custom password store for all SQLACHEMY connections
 # implement a function that takes a single argument of type 'sqla.engine.url',
@@ -188,6 +188,10 @@ TABLE_NAMES_CACHE_CONFIG = {'CACHE_TYPE': 'null'}
 ENABLE_CORS = False
 CORS_OPTIONS = {}
 
+# Allowed format types for upload on Database view
+# TODO: Add processing of other spreadsheet formats (xls, xlsx etc)
+ALLOWED_EXTENSIONS = set(['csv'])
+
 # CSV Options: key/value pairs that will be passed as argument to 
DataFrame.to_csv method
 # note: index option should not be overridden
 CSV_EXPORT = {
@@ -298,6 +302,14 @@ SQLLAB_ASYNC_TIME_LIMIT_SEC = 60 * 60 * 6
 # in SQL Lab by using the "Run Async" button/feature
 RESULTS_BACKEND = None
 
+# The S3 bucket where you want to store your external hive tables created
+# from CSV files. For example, 'companyname-superset'
+CSV_TO_HIVE_UPLOAD_S3_BUCKET = None
+
+# The directory within the bucket specified above that will
+# contain all the external tables
+CSV_TO_HIVE_UPLOAD_DIRECTORY = 'EXTERNAL_HIVE_TABLES/'
+
 # A dictionary of items that gets merged into the Jinja context for
 # SQL Lab. The existing context gets updated with this dictionary,
 # meaning values for existing keys get overwritten by the content of this
diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py
index ba20cce..51fa055 100644
--- a/superset/db_engine_specs.py
+++ b/superset/db_engine_specs.py
@@ -17,21 +17,30 @@ from __future__ import print_function
 from __future__ import unicode_literals
 
 from collections import defaultdict, namedtuple
+import csv
 import inspect
 import logging
+import os
 import re
 import textwrap
 import time
 
+import boto3
+from flask import g
 from flask_babel import lazy_gettext as _
+import pandas
 from sqlalchemy import select
+from sqlalchemy.engine import create_engine
 from sqlalchemy.engine.url import make_url
 from sqlalchemy.sql import text
 import sqlparse
+from werkzeug.utils import secure_filename
 
-from superset import cache_util, conf, utils
+from superset import app, cache_util, conf, db, utils
 from superset.utils import QueryStatus, SupersetTemplateException
 
+config = app.config
+
 tracking_url_trans = conf.get('TRACKING_URL_TRANSFORMER')
 
 Grain = namedtuple('Grain', 'name label function')
@@ -73,6 +82,65 @@ class BaseEngineSpec(object):
         """Returns engine-specific table metadata"""
         return {}
 
+    @staticmethod
+    def csv_to_df(**kwargs):
+        kwargs['filepath_or_buffer'] = \
+            app.config['UPLOAD_FOLDER'] + kwargs['filepath_or_buffer']
+        kwargs['encoding'] = 'utf-8'
+        kwargs['iterator'] = True
+        chunks = pandas.read_csv(**kwargs)
+        df = pandas.DataFrame()
+        df = pandas.concat(chunk for chunk in chunks)
+        return df
+
+    @staticmethod
+    def df_to_db(df, table, **kwargs):
+        df.to_sql(**kwargs)
+        table.user_id = g.user.id
+        table.schema = kwargs['schema']
+        table.fetch_metadata()
+        db.session.add(table)
+        db.session.commit()
+
+    @staticmethod
+    def create_table_from_csv(form, table):
+        def _allowed_file(filename):
+            # Only allow specific file extensions as specified in the config
+            extension = os.path.splitext(filename)[1]
+            return extension and extension[1:] in 
app.config['ALLOWED_EXTENSIONS']
+
+        filename = secure_filename(form.csv_file.data.filename)
+        if not _allowed_file(filename):
+            raise Exception('Invalid file type selected')
+        kwargs = {
+            'filepath_or_buffer': filename,
+            'sep': form.sep.data,
+            'header': form.header.data if form.header.data else 0,
+            'index_col': form.index_col.data,
+            'mangle_dupe_cols': form.mangle_dupe_cols.data,
+            'skipinitialspace': form.skipinitialspace.data,
+            'skiprows': form.skiprows.data,
+            'nrows': form.nrows.data,
+            'skip_blank_lines': form.skip_blank_lines.data,
+            'parse_dates': form.parse_dates.data,
+            'infer_datetime_format': form.infer_datetime_format.data,
+            'chunksize': 10000,
+        }
+        df = BaseEngineSpec.csv_to_df(**kwargs)
+
+        df_to_db_kwargs = {
+            'table': table,
+            'df': df,
+            'name': form.name.data,
+            'con': create_engine(form.con.data, echo=False),
+            'schema': form.schema.data,
+            'if_exists': form.if_exists.data,
+            'index': form.index.data,
+            'index_label': form.index_label.data,
+            'chunksize': 10000,
+        }
+        BaseEngineSpec.df_to_db(**df_to_db_kwargs)
+
     @classmethod
     def escape_sql(cls, sql):
         """Escapes the raw SQL"""
@@ -721,6 +789,45 @@ class HiveEngineSpec(PrestoEngineSpec):
         return BaseEngineSpec.fetch_result_sets(
             db, datasource_type, force=force)
 
+    @staticmethod
+    def create_table_from_csv(form, table):
+        """Uploads a csv file and creates a superset datasource in Hive."""
+        def get_column_names(filepath):
+            with open(filepath, 'rb') as f:
+                return csv.reader(f).next()
+
+        table_name = form.name.data
+        filename = form.csv_file.data.filename
+
+        bucket_path = app.config['CSV_TO_HIVE_UPLOAD_BUCKET']
+
+        if not bucket_path:
+            logging.info('No upload bucket specified')
+            raise Exception(
+                'No upload bucket specified. You can specify one in the config 
file.')
+
+        upload_prefix = app.config['CSV_TO_HIVE_UPLOAD_DIRECTORY']
+        dest_path = os.path.join(table_name, filename)
+
+        upload_path = app.config['UPLOAD_FOLDER'] + \
+            secure_filename(form.csv_file.data.filename)
+        column_names = get_column_names(upload_path)
+        schema_definition = ', '.join(
+            [s + ' STRING ' for s in column_names])
+
+        s3 = boto3.client('s3')
+        location = os.path.join('s3a://', bucket_path, upload_prefix, 
table_name)
+        s3.upload_file(
+            upload_path, 'airbnb-superset',
+            os.path.join(upload_prefix, table_name, filename))
+        sql = """CREATE EXTERNAL TABLE {table_name} ( {schema_definition} )
+            ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS
+            TEXTFILE LOCATION '{location}'""".format(**locals())
+
+        logging.info(form.con.data)
+        engine = create_engine(form.con.data)
+        engine.execute(sql)
+
     @classmethod
     def convert_dttm(cls, target_type, dttm):
         tt = target_type.upper()
diff --git a/superset/forms.py b/superset/forms.py
new file mode 100644
index 0000000..a077904
--- /dev/null
+++ b/superset/forms.py
@@ -0,0 +1,123 @@
+"""Contains the logic to create cohesive forms on the explore view"""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
+from flask_appbuilder.forms import DynamicForm
+from flask_babel import lazy_gettext as _
+from flask_wtf.file import FileAllowed, FileField, FileRequired
+from wtforms import (
+    BooleanField, IntegerField, SelectField, StringField)
+from wtforms.validators import DataRequired, NumberRange, Optional
+
+from superset import app
+
+config = app.config
+
+
+class CsvToDatabaseForm(DynamicForm):
+    name = StringField(
+        _('Table Name'),
+        description=_('Name of table to be created from csv data.'),
+        validators=[DataRequired()],
+        widget=BS3TextFieldWidget())
+    csv_file = FileField(
+        _('CSV File'),
+        description=_('Select a CSV file to be uploaded to a database.'),
+        validators=[
+            FileRequired(), FileAllowed(['csv'], _('CSV Files Only!'))])
+
+    con = SelectField(
+        _('Database'),
+        description=_('database in which to add above table.'),
+        validators=[DataRequired()],
+        choices=[])
+    sep = StringField(
+        _('Delimiter'),
+        description=_('Delimiter used by CSV file (for whitespace use \s+).'),
+        validators=[DataRequired()],
+        widget=BS3TextFieldWidget())
+    if_exists = SelectField(
+        _('Table Exists'),
+        description=_(
+            'If table exists do one of the following: '
+            'Fail (do nothing), Replace (drop and recreate table) '
+            'or Append (insert data).'),
+        choices=[
+            ('fail', _('Fail')), ('replace', _('Replace')),
+            ('append', _('Append'))],
+        validators=[DataRequired()])
+
+    schema = StringField(
+        _('Schema'),
+        description=_('Specify a schema (if database flavour supports this).'),
+        validators=[Optional()],
+        widget=BS3TextFieldWidget(),
+        filters=[lambda x: x or None])
+    header = IntegerField(
+        _('Header Row'),
+        description=_(
+            'Row containing the headers to use as '
+            'column names (0 is first line of data). '
+            'Leave empty if there is no header row.'),
+        validators=[Optional()],
+        widget=BS3TextFieldWidget(),
+        filters=[lambda x: x or None])
+    index_col = IntegerField(
+        _('Index Column'),
+        description=_(
+            'Column to use as the row labels of the '
+            'dataframe. Leave empty if no index column.'),
+        validators=[Optional(), NumberRange(0, 1E+20)],
+        widget=BS3TextFieldWidget(),
+        filters=[lambda x: x or None])
+    mangle_dupe_cols = BooleanField(
+        _('Mangle Duplicate Columns'),
+        description=_('Specify duplicate columns as "X.0, X.1".'))
+    skipinitialspace = BooleanField(
+        _('Skip Initial Space'),
+        description=_('Skip spaces after delimiter.'))
+    skiprows = IntegerField(
+        _('Skip Rows'),
+        description=_('Number of rows to skip at start of file.'),
+        validators=[Optional(), NumberRange(0, 1E+20)],
+        widget=BS3TextFieldWidget(),
+        filters=[lambda x: x or None])
+    nrows = IntegerField(
+        _('Rows to Read'),
+        description=_('Number of rows of file to read.'),
+        validators=[Optional(), NumberRange(0, 1E+20)],
+        widget=BS3TextFieldWidget(),
+        filters=[lambda x: x or None])
+    skip_blank_lines = BooleanField(
+        _('Skip Blank Lines'),
+        description=_(
+            'Skip blank lines rather than interpreting them '
+            'as NaN values.'))
+    parse_dates = BooleanField(
+        _('Parse Dates'),
+        description=_('Parse date values.'))
+    infer_datetime_format = BooleanField(
+        _('Infer Datetime Format'),
+        description=_(
+            'Use Pandas to interpret the datetime format '
+            'automatically.'))
+    decimal = StringField(
+        _('Decimal Character'),
+        description=_('Character to interpret as decimal point.'),
+        validators=[Optional()],
+        widget=BS3TextFieldWidget(),
+        filters=[lambda x: x or '.'])
+    index = BooleanField(
+        _('Dataframe Index'),
+        description=_('Write dataframe index as a column.'))
+    index_label = StringField(
+        _('Column Label(s)'),
+        description=_(
+            'Column label for index column(s). If None is given '
+            'and Dataframe Index is True, Index Names are used.'),
+        validators=[Optional()],
+        widget=BS3TextFieldWidget(),
+        filters=[lambda x: x or None])
diff --git a/superset/views/core.py b/superset/views/core.py
index 261696c..a12721d 100755
--- a/superset/views/core.py
+++ b/superset/views/core.py
@@ -7,6 +7,7 @@ from collections import defaultdict
 from datetime import datetime, timedelta
 import json
 import logging
+import os
 import pickle
 import re
 import time
@@ -16,7 +17,7 @@ from urllib import parse
 from flask import (
     flash, g, Markup, redirect, render_template, request, Response, url_for,
 )
-from flask_appbuilder import expose
+from flask_appbuilder import expose, SimpleFormView
 from flask_appbuilder.actions import action
 from flask_appbuilder.models.sqla.interface import SQLAInterface
 from flask_appbuilder.security.decorators import has_access_api
@@ -28,12 +29,15 @@ import sqlalchemy as sqla
 from sqlalchemy import create_engine
 from sqlalchemy.engine.url import make_url
 from werkzeug.routing import BaseConverter
+from werkzeug.utils import secure_filename
 
 from superset import (
     app, appbuilder, cache, db, results_backend, security, sm, sql_lab, utils,
     viz,
 )
 from superset.connectors.connector_registry import ConnectorRegistry
+from superset.connectors.sqla.models import SqlaTable
+from superset.forms import CsvToDatabaseForm
 from superset.legacy import cast_form_data
 import superset.models.core as models
 from superset.models.sql_lab import Query
@@ -305,6 +309,71 @@ class DatabaseAsync(DatabaseView):
 appbuilder.add_view_no_menu(DatabaseAsync)
 
 
+class CsvToDatabaseView(SimpleFormView):
+    form = CsvToDatabaseForm
+    form_title = _('CSV to Database configuration')
+    add_columns = ['database', 'schema', 'table_name']
+
+    def form_get(self, form):
+        form.sep.data = ','
+        form.header.data = 0
+        form.mangle_dupe_cols.data = True
+        form.skipinitialspace.data = False
+        form.skip_blank_lines.data = True
+        form.parse_dates.data = True
+        form.infer_datetime_format.data = True
+        form.decimal.data = '.'
+        form.if_exists.data = 'append'
+        all_datasources = (
+            db.session.query(
+                models.Database.sqlalchemy_uri,
+                models.Database.database_name)
+            .all()
+        )
+        form.con.choices += all_datasources
+
+    def form_post(self, form):
+        def _upload_file(csv_file):
+            if csv_file and csv_file.filename:
+                filename = secure_filename(csv_file.filename)
+                csv_file.save(os.path.join(config['UPLOAD_FOLDER'], filename))
+                return filename
+
+        csv_file = form.csv_file.data
+        _upload_file(csv_file)
+        table = SqlaTable(table_name=form.name.data)
+        database = (
+            db.session.query(models.Database)
+            .filter_by(sqlalchemy_uri=form.data.get('con'))
+            .one()
+        )
+        table.database = database
+        table.database_id = database.id
+        try:
+            database.db_engine_spec.create_table_from_csv(form, table)
+        except Exception as e:
+            os.remove(os.path.join(config['UPLOAD_FOLDER'], csv_file.filename))
+            flash(e, 'error')
+            return redirect('/tablemodelview/list/')
+
+        os.remove(os.path.join(config['UPLOAD_FOLDER'], csv_file.filename))
+        # Go back to welcome page / splash screen
+        db_name = (
+            db.session.query(models.Database.database_name)
+            .filter_by(sqlalchemy_uri=form.data.get('con'))
+            .one()
+        )
+        message = _('CSV file "{0}" uploaded to table "{1}" in '
+                    'database "{2}"'.format(form.csv_file.data.filename,
+                                            form.name.data,
+                                            db_name[0]))
+        flash(message, 'info')
+        return redirect('/tablemodelview/list/')
+
+
+appbuilder.add_view_no_menu(CsvToDatabaseView)
+
+
 class DatabaseTablesAsync(DatabaseView):
     list_columns = ['id', 'all_table_names', 'all_schema_names']
 
@@ -2459,6 +2528,16 @@ appbuilder.add_link(
     category_label=__('SQL Lab'),
 )
 
+appbuilder.add_link(
+    'Upload a CSV',
+    label=__('Upload a CSV'),
+    href='/csvtodatabaseview/form',
+    icon='fa-upload',
+    category='Sources',
+    category_label=__('Sources'),
+    category_icon='fa-wrench',)
+appbuilder.add_separator('Sources')
+
 
 @app.after_request
 def apply_caching(response):
diff --git a/tests/core_tests.py b/tests/core_tests.py
index 4b32c10..e6381b7 100644
--- a/tests/core_tests.py
+++ b/tests/core_tests.py
@@ -10,7 +10,9 @@ import doctest
 import io
 import json
 import logging
+import os
 import random
+import string
 import unittest
 
 from flask import escape
@@ -789,6 +791,46 @@ class CoreTests(SupersetTestCase):
             {'name': ' NULL', 'sum__num': 0},
         )
 
+    def test_import_csv(self):
+        self.login(username='admin')
+        filename = 'testCSV.csv'
+        table_name = ''.join(
+            random.choice(string.ascii_uppercase) for _ in range(5))
+
+        test_file = open(filename, 'w+')
+        test_file.write('a,b\n')
+        test_file.write('john,1\n')
+        test_file.write('paul,2\n')
+        test_file.close()
+        main_db_uri = db.session.query(
+            models.Database.sqlalchemy_uri)\
+            .filter_by(database_name='main').all()
+
+        test_file = open(filename, 'rb')
+        form_data = {
+            'csv_file': test_file,
+            'sep': ',',
+            'name': table_name,
+            'con': main_db_uri[0][0],
+            'if_exists': 'append',
+            'index_label': 'test_label',
+            'mangle_dupe_cols': False}
+
+        url = '/databaseview/list/'
+        add_datasource_page = self.get_resp(url)
+        assert 'Upload a CSV' in add_datasource_page
+
+        url = '/csvtodatabaseview/form'
+        form_get = self.get_resp(url)
+        assert 'CSV to Database configuration' in form_get
+
+        try:
+            # ensure uploaded successfully
+            form_post = self.get_resp(url, data=form_data)
+            assert 'CSV file \"testCSV.csv\" uploaded to table' in form_post
+        finally:
+            os.remove(filename)
+
     def test_dataframe_timezone(self):
         tz = psycopg2.tz.FixedOffsetTimezone(offset=60, name=None)
         data = [(datetime.datetime(2017, 11, 18, 21, 53, 0, 219225, 
tzinfo=tz),),
diff --git a/tox.ini b/tox.ini
index 78198ea..b9b8c11 100644
--- a/tox.ini
+++ b/tox.ini
@@ -68,7 +68,7 @@ commands =
 [testenv:py27-mysql]
 basepython = python2.7
 setenv =
-    SUPERSET__SQLALCHEMY_DATABASE_URI = 
mysql://mysqluser:mysqluserpassword@localhost/superset?charset=utf8
+    SUPERSET__SQLALCHEMY_DATABASE_URI = 
mysql://root@localhost/superset?charset=utf8
 
 [testenv:py34-mysql]
 basepython = python3.4

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to