This is an automated email from the ASF dual-hosted git repository.
maximebeauchemin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git
The following commit(s) were added to refs/heads/master by this push:
new 268edcf Import CSV (#3643)
268edcf is described below
commit 268edcfedd0eae0dcb33bdd1e14795f602ab4532
Author: timifasubaa <[email protected]>
AuthorDate: Mon Nov 27 21:07:12 2017 -0800
Import CSV (#3643)
* add upload csv button to sources dropdown
* upload csv to non-hive datasources
* upload csv to hive datasource
* update FAQ page
* add tests
* fix linting errors and merge conflicts
* Update .travis.yml
* Update tox.ini
---
docs/faq.rst | 7 +++
superset/config.py | 16 +++++-
superset/db_engine_specs.py | 109 ++++++++++++++++++++++++++++++++++++++-
superset/forms.py | 123 ++++++++++++++++++++++++++++++++++++++++++++
superset/views/core.py | 81 ++++++++++++++++++++++++++++-
tests/core_tests.py | 42 +++++++++++++++
tox.ini | 2 +-
7 files changed, 375 insertions(+), 5 deletions(-)
diff --git a/docs/faq.rst b/docs/faq.rst
index 0ca341e..b6520b9 100644
--- a/docs/faq.rst
+++ b/docs/faq.rst
@@ -45,6 +45,13 @@ visualizations.
https://github.com/airbnb/superset/issues?q=label%3Aexample+is%3Aclosed
+Can I upload and visualize csv data?
+-------------------------------------
+
+Yes, using the ``Upload a CSV`` button under the ``Sources``
+menu item. This brings up a form that allows you specify required information.
After creating the table from CSV, it can then be loadede like any other on the
``Sources -> Tables``page.
+
+
Why are my queries timing out?
------------------------------
diff --git a/superset/config.py b/superset/config.py
index ebc4777..0e0bd9b 100644
--- a/superset/config.py
+++ b/superset/config.py
@@ -58,9 +58,9 @@ SQLALCHEMY_TRACK_MODIFICATIONS = False
SECRET_KEY = '\2\1thisismyscretkey\1\2\e\y\y\h' # noqa
# The SQLAlchemy connection string.
-SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(DATA_DIR, 'superset.db')
+# SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(DATA_DIR,
'superset.db')
# SQLALCHEMY_DATABASE_URI = 'mysql://myapp@localhost/myapp'
-# SQLALCHEMY_DATABASE_URI = 'postgresql://root:password@localhost/myapp'
+SQLALCHEMY_DATABASE_URI = 'postgresql://root:password@localhost/myapp'
# In order to hook up a custom password store for all SQLACHEMY connections
# implement a function that takes a single argument of type 'sqla.engine.url',
@@ -188,6 +188,10 @@ TABLE_NAMES_CACHE_CONFIG = {'CACHE_TYPE': 'null'}
ENABLE_CORS = False
CORS_OPTIONS = {}
+# Allowed format types for upload on Database view
+# TODO: Add processing of other spreadsheet formats (xls, xlsx etc)
+ALLOWED_EXTENSIONS = set(['csv'])
+
# CSV Options: key/value pairs that will be passed as argument to
DataFrame.to_csv method
# note: index option should not be overridden
CSV_EXPORT = {
@@ -298,6 +302,14 @@ SQLLAB_ASYNC_TIME_LIMIT_SEC = 60 * 60 * 6
# in SQL Lab by using the "Run Async" button/feature
RESULTS_BACKEND = None
+# The S3 bucket where you want to store your external hive tables created
+# from CSV files. For example, 'companyname-superset'
+CSV_TO_HIVE_UPLOAD_S3_BUCKET = None
+
+# The directory within the bucket specified above that will
+# contain all the external tables
+CSV_TO_HIVE_UPLOAD_DIRECTORY = 'EXTERNAL_HIVE_TABLES/'
+
# A dictionary of items that gets merged into the Jinja context for
# SQL Lab. The existing context gets updated with this dictionary,
# meaning values for existing keys get overwritten by the content of this
diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py
index ba20cce..51fa055 100644
--- a/superset/db_engine_specs.py
+++ b/superset/db_engine_specs.py
@@ -17,21 +17,30 @@ from __future__ import print_function
from __future__ import unicode_literals
from collections import defaultdict, namedtuple
+import csv
import inspect
import logging
+import os
import re
import textwrap
import time
+import boto3
+from flask import g
from flask_babel import lazy_gettext as _
+import pandas
from sqlalchemy import select
+from sqlalchemy.engine import create_engine
from sqlalchemy.engine.url import make_url
from sqlalchemy.sql import text
import sqlparse
+from werkzeug.utils import secure_filename
-from superset import cache_util, conf, utils
+from superset import app, cache_util, conf, db, utils
from superset.utils import QueryStatus, SupersetTemplateException
+config = app.config
+
tracking_url_trans = conf.get('TRACKING_URL_TRANSFORMER')
Grain = namedtuple('Grain', 'name label function')
@@ -73,6 +82,65 @@ class BaseEngineSpec(object):
"""Returns engine-specific table metadata"""
return {}
+ @staticmethod
+ def csv_to_df(**kwargs):
+ kwargs['filepath_or_buffer'] = \
+ app.config['UPLOAD_FOLDER'] + kwargs['filepath_or_buffer']
+ kwargs['encoding'] = 'utf-8'
+ kwargs['iterator'] = True
+ chunks = pandas.read_csv(**kwargs)
+ df = pandas.DataFrame()
+ df = pandas.concat(chunk for chunk in chunks)
+ return df
+
+ @staticmethod
+ def df_to_db(df, table, **kwargs):
+ df.to_sql(**kwargs)
+ table.user_id = g.user.id
+ table.schema = kwargs['schema']
+ table.fetch_metadata()
+ db.session.add(table)
+ db.session.commit()
+
+ @staticmethod
+ def create_table_from_csv(form, table):
+ def _allowed_file(filename):
+ # Only allow specific file extensions as specified in the config
+ extension = os.path.splitext(filename)[1]
+ return extension and extension[1:] in
app.config['ALLOWED_EXTENSIONS']
+
+ filename = secure_filename(form.csv_file.data.filename)
+ if not _allowed_file(filename):
+ raise Exception('Invalid file type selected')
+ kwargs = {
+ 'filepath_or_buffer': filename,
+ 'sep': form.sep.data,
+ 'header': form.header.data if form.header.data else 0,
+ 'index_col': form.index_col.data,
+ 'mangle_dupe_cols': form.mangle_dupe_cols.data,
+ 'skipinitialspace': form.skipinitialspace.data,
+ 'skiprows': form.skiprows.data,
+ 'nrows': form.nrows.data,
+ 'skip_blank_lines': form.skip_blank_lines.data,
+ 'parse_dates': form.parse_dates.data,
+ 'infer_datetime_format': form.infer_datetime_format.data,
+ 'chunksize': 10000,
+ }
+ df = BaseEngineSpec.csv_to_df(**kwargs)
+
+ df_to_db_kwargs = {
+ 'table': table,
+ 'df': df,
+ 'name': form.name.data,
+ 'con': create_engine(form.con.data, echo=False),
+ 'schema': form.schema.data,
+ 'if_exists': form.if_exists.data,
+ 'index': form.index.data,
+ 'index_label': form.index_label.data,
+ 'chunksize': 10000,
+ }
+ BaseEngineSpec.df_to_db(**df_to_db_kwargs)
+
@classmethod
def escape_sql(cls, sql):
"""Escapes the raw SQL"""
@@ -721,6 +789,45 @@ class HiveEngineSpec(PrestoEngineSpec):
return BaseEngineSpec.fetch_result_sets(
db, datasource_type, force=force)
+ @staticmethod
+ def create_table_from_csv(form, table):
+ """Uploads a csv file and creates a superset datasource in Hive."""
+ def get_column_names(filepath):
+ with open(filepath, 'rb') as f:
+ return csv.reader(f).next()
+
+ table_name = form.name.data
+ filename = form.csv_file.data.filename
+
+ bucket_path = app.config['CSV_TO_HIVE_UPLOAD_BUCKET']
+
+ if not bucket_path:
+ logging.info('No upload bucket specified')
+ raise Exception(
+ 'No upload bucket specified. You can specify one in the config
file.')
+
+ upload_prefix = app.config['CSV_TO_HIVE_UPLOAD_DIRECTORY']
+ dest_path = os.path.join(table_name, filename)
+
+ upload_path = app.config['UPLOAD_FOLDER'] + \
+ secure_filename(form.csv_file.data.filename)
+ column_names = get_column_names(upload_path)
+ schema_definition = ', '.join(
+ [s + ' STRING ' for s in column_names])
+
+ s3 = boto3.client('s3')
+ location = os.path.join('s3a://', bucket_path, upload_prefix,
table_name)
+ s3.upload_file(
+ upload_path, 'airbnb-superset',
+ os.path.join(upload_prefix, table_name, filename))
+ sql = """CREATE EXTERNAL TABLE {table_name} ( {schema_definition} )
+ ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS
+ TEXTFILE LOCATION '{location}'""".format(**locals())
+
+ logging.info(form.con.data)
+ engine = create_engine(form.con.data)
+ engine.execute(sql)
+
@classmethod
def convert_dttm(cls, target_type, dttm):
tt = target_type.upper()
diff --git a/superset/forms.py b/superset/forms.py
new file mode 100644
index 0000000..a077904
--- /dev/null
+++ b/superset/forms.py
@@ -0,0 +1,123 @@
+"""Contains the logic to create cohesive forms on the explore view"""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
+from flask_appbuilder.forms import DynamicForm
+from flask_babel import lazy_gettext as _
+from flask_wtf.file import FileAllowed, FileField, FileRequired
+from wtforms import (
+ BooleanField, IntegerField, SelectField, StringField)
+from wtforms.validators import DataRequired, NumberRange, Optional
+
+from superset import app
+
+config = app.config
+
+
+class CsvToDatabaseForm(DynamicForm):
+ name = StringField(
+ _('Table Name'),
+ description=_('Name of table to be created from csv data.'),
+ validators=[DataRequired()],
+ widget=BS3TextFieldWidget())
+ csv_file = FileField(
+ _('CSV File'),
+ description=_('Select a CSV file to be uploaded to a database.'),
+ validators=[
+ FileRequired(), FileAllowed(['csv'], _('CSV Files Only!'))])
+
+ con = SelectField(
+ _('Database'),
+ description=_('database in which to add above table.'),
+ validators=[DataRequired()],
+ choices=[])
+ sep = StringField(
+ _('Delimiter'),
+ description=_('Delimiter used by CSV file (for whitespace use \s+).'),
+ validators=[DataRequired()],
+ widget=BS3TextFieldWidget())
+ if_exists = SelectField(
+ _('Table Exists'),
+ description=_(
+ 'If table exists do one of the following: '
+ 'Fail (do nothing), Replace (drop and recreate table) '
+ 'or Append (insert data).'),
+ choices=[
+ ('fail', _('Fail')), ('replace', _('Replace')),
+ ('append', _('Append'))],
+ validators=[DataRequired()])
+
+ schema = StringField(
+ _('Schema'),
+ description=_('Specify a schema (if database flavour supports this).'),
+ validators=[Optional()],
+ widget=BS3TextFieldWidget(),
+ filters=[lambda x: x or None])
+ header = IntegerField(
+ _('Header Row'),
+ description=_(
+ 'Row containing the headers to use as '
+ 'column names (0 is first line of data). '
+ 'Leave empty if there is no header row.'),
+ validators=[Optional()],
+ widget=BS3TextFieldWidget(),
+ filters=[lambda x: x or None])
+ index_col = IntegerField(
+ _('Index Column'),
+ description=_(
+ 'Column to use as the row labels of the '
+ 'dataframe. Leave empty if no index column.'),
+ validators=[Optional(), NumberRange(0, 1E+20)],
+ widget=BS3TextFieldWidget(),
+ filters=[lambda x: x or None])
+ mangle_dupe_cols = BooleanField(
+ _('Mangle Duplicate Columns'),
+ description=_('Specify duplicate columns as "X.0, X.1".'))
+ skipinitialspace = BooleanField(
+ _('Skip Initial Space'),
+ description=_('Skip spaces after delimiter.'))
+ skiprows = IntegerField(
+ _('Skip Rows'),
+ description=_('Number of rows to skip at start of file.'),
+ validators=[Optional(), NumberRange(0, 1E+20)],
+ widget=BS3TextFieldWidget(),
+ filters=[lambda x: x or None])
+ nrows = IntegerField(
+ _('Rows to Read'),
+ description=_('Number of rows of file to read.'),
+ validators=[Optional(), NumberRange(0, 1E+20)],
+ widget=BS3TextFieldWidget(),
+ filters=[lambda x: x or None])
+ skip_blank_lines = BooleanField(
+ _('Skip Blank Lines'),
+ description=_(
+ 'Skip blank lines rather than interpreting them '
+ 'as NaN values.'))
+ parse_dates = BooleanField(
+ _('Parse Dates'),
+ description=_('Parse date values.'))
+ infer_datetime_format = BooleanField(
+ _('Infer Datetime Format'),
+ description=_(
+ 'Use Pandas to interpret the datetime format '
+ 'automatically.'))
+ decimal = StringField(
+ _('Decimal Character'),
+ description=_('Character to interpret as decimal point.'),
+ validators=[Optional()],
+ widget=BS3TextFieldWidget(),
+ filters=[lambda x: x or '.'])
+ index = BooleanField(
+ _('Dataframe Index'),
+ description=_('Write dataframe index as a column.'))
+ index_label = StringField(
+ _('Column Label(s)'),
+ description=_(
+ 'Column label for index column(s). If None is given '
+ 'and Dataframe Index is True, Index Names are used.'),
+ validators=[Optional()],
+ widget=BS3TextFieldWidget(),
+ filters=[lambda x: x or None])
diff --git a/superset/views/core.py b/superset/views/core.py
index 261696c..a12721d 100755
--- a/superset/views/core.py
+++ b/superset/views/core.py
@@ -7,6 +7,7 @@ from collections import defaultdict
from datetime import datetime, timedelta
import json
import logging
+import os
import pickle
import re
import time
@@ -16,7 +17,7 @@ from urllib import parse
from flask import (
flash, g, Markup, redirect, render_template, request, Response, url_for,
)
-from flask_appbuilder import expose
+from flask_appbuilder import expose, SimpleFormView
from flask_appbuilder.actions import action
from flask_appbuilder.models.sqla.interface import SQLAInterface
from flask_appbuilder.security.decorators import has_access_api
@@ -28,12 +29,15 @@ import sqlalchemy as sqla
from sqlalchemy import create_engine
from sqlalchemy.engine.url import make_url
from werkzeug.routing import BaseConverter
+from werkzeug.utils import secure_filename
from superset import (
app, appbuilder, cache, db, results_backend, security, sm, sql_lab, utils,
viz,
)
from superset.connectors.connector_registry import ConnectorRegistry
+from superset.connectors.sqla.models import SqlaTable
+from superset.forms import CsvToDatabaseForm
from superset.legacy import cast_form_data
import superset.models.core as models
from superset.models.sql_lab import Query
@@ -305,6 +309,71 @@ class DatabaseAsync(DatabaseView):
appbuilder.add_view_no_menu(DatabaseAsync)
+class CsvToDatabaseView(SimpleFormView):
+ form = CsvToDatabaseForm
+ form_title = _('CSV to Database configuration')
+ add_columns = ['database', 'schema', 'table_name']
+
+ def form_get(self, form):
+ form.sep.data = ','
+ form.header.data = 0
+ form.mangle_dupe_cols.data = True
+ form.skipinitialspace.data = False
+ form.skip_blank_lines.data = True
+ form.parse_dates.data = True
+ form.infer_datetime_format.data = True
+ form.decimal.data = '.'
+ form.if_exists.data = 'append'
+ all_datasources = (
+ db.session.query(
+ models.Database.sqlalchemy_uri,
+ models.Database.database_name)
+ .all()
+ )
+ form.con.choices += all_datasources
+
+ def form_post(self, form):
+ def _upload_file(csv_file):
+ if csv_file and csv_file.filename:
+ filename = secure_filename(csv_file.filename)
+ csv_file.save(os.path.join(config['UPLOAD_FOLDER'], filename))
+ return filename
+
+ csv_file = form.csv_file.data
+ _upload_file(csv_file)
+ table = SqlaTable(table_name=form.name.data)
+ database = (
+ db.session.query(models.Database)
+ .filter_by(sqlalchemy_uri=form.data.get('con'))
+ .one()
+ )
+ table.database = database
+ table.database_id = database.id
+ try:
+ database.db_engine_spec.create_table_from_csv(form, table)
+ except Exception as e:
+ os.remove(os.path.join(config['UPLOAD_FOLDER'], csv_file.filename))
+ flash(e, 'error')
+ return redirect('/tablemodelview/list/')
+
+ os.remove(os.path.join(config['UPLOAD_FOLDER'], csv_file.filename))
+ # Go back to welcome page / splash screen
+ db_name = (
+ db.session.query(models.Database.database_name)
+ .filter_by(sqlalchemy_uri=form.data.get('con'))
+ .one()
+ )
+ message = _('CSV file "{0}" uploaded to table "{1}" in '
+ 'database "{2}"'.format(form.csv_file.data.filename,
+ form.name.data,
+ db_name[0]))
+ flash(message, 'info')
+ return redirect('/tablemodelview/list/')
+
+
+appbuilder.add_view_no_menu(CsvToDatabaseView)
+
+
class DatabaseTablesAsync(DatabaseView):
list_columns = ['id', 'all_table_names', 'all_schema_names']
@@ -2459,6 +2528,16 @@ appbuilder.add_link(
category_label=__('SQL Lab'),
)
+appbuilder.add_link(
+ 'Upload a CSV',
+ label=__('Upload a CSV'),
+ href='/csvtodatabaseview/form',
+ icon='fa-upload',
+ category='Sources',
+ category_label=__('Sources'),
+ category_icon='fa-wrench',)
+appbuilder.add_separator('Sources')
+
@app.after_request
def apply_caching(response):
diff --git a/tests/core_tests.py b/tests/core_tests.py
index 4b32c10..e6381b7 100644
--- a/tests/core_tests.py
+++ b/tests/core_tests.py
@@ -10,7 +10,9 @@ import doctest
import io
import json
import logging
+import os
import random
+import string
import unittest
from flask import escape
@@ -789,6 +791,46 @@ class CoreTests(SupersetTestCase):
{'name': ' NULL', 'sum__num': 0},
)
+ def test_import_csv(self):
+ self.login(username='admin')
+ filename = 'testCSV.csv'
+ table_name = ''.join(
+ random.choice(string.ascii_uppercase) for _ in range(5))
+
+ test_file = open(filename, 'w+')
+ test_file.write('a,b\n')
+ test_file.write('john,1\n')
+ test_file.write('paul,2\n')
+ test_file.close()
+ main_db_uri = db.session.query(
+ models.Database.sqlalchemy_uri)\
+ .filter_by(database_name='main').all()
+
+ test_file = open(filename, 'rb')
+ form_data = {
+ 'csv_file': test_file,
+ 'sep': ',',
+ 'name': table_name,
+ 'con': main_db_uri[0][0],
+ 'if_exists': 'append',
+ 'index_label': 'test_label',
+ 'mangle_dupe_cols': False}
+
+ url = '/databaseview/list/'
+ add_datasource_page = self.get_resp(url)
+ assert 'Upload a CSV' in add_datasource_page
+
+ url = '/csvtodatabaseview/form'
+ form_get = self.get_resp(url)
+ assert 'CSV to Database configuration' in form_get
+
+ try:
+ # ensure uploaded successfully
+ form_post = self.get_resp(url, data=form_data)
+ assert 'CSV file \"testCSV.csv\" uploaded to table' in form_post
+ finally:
+ os.remove(filename)
+
def test_dataframe_timezone(self):
tz = psycopg2.tz.FixedOffsetTimezone(offset=60, name=None)
data = [(datetime.datetime(2017, 11, 18, 21, 53, 0, 219225,
tzinfo=tz),),
diff --git a/tox.ini b/tox.ini
index 78198ea..b9b8c11 100644
--- a/tox.ini
+++ b/tox.ini
@@ -68,7 +68,7 @@ commands =
[testenv:py27-mysql]
basepython = python2.7
setenv =
- SUPERSET__SQLALCHEMY_DATABASE_URI =
mysql://mysqluser:mysqluserpassword@localhost/superset?charset=utf8
+ SUPERSET__SQLALCHEMY_DATABASE_URI =
mysql://root@localhost/superset?charset=utf8
[testenv:py34-mysql]
basepython = python3.4
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].