This is an automated email from the ASF dual-hosted git repository.

beto pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git


The following commit(s) were added to refs/heads/master by this push:
     new 4132d8f  Simplify and improve `expand_data` in Presto (#8233)
4132d8f is described below

commit 4132d8fb0f7d79556e4ecf6199e0611da5303aea
Author: Beto Dealmeida <[email protected]>
AuthorDate: Tue Sep 17 14:04:14 2019 -0700

    Simplify and improve `expand_data` in Presto (#8233)
    
    * WIP
    
    * Working version, needs cleanup
    
    * Add unit tests to split
    
    * Small fixes
    
    * Dedupe array columns
    
    * Fix lint
---
 superset/db_engine_specs/presto.py | 151 ++++++++++++++++++++++++++++---------
 superset/utils/core.py             |  34 ++++++++-
 tests/db_engine_specs_test.py      | 130 ++++++++++++++++++++++---------
 tests/utils_tests.py               |  15 ++++
 4 files changed, 258 insertions(+), 72 deletions(-)

diff --git a/superset/db_engine_specs/presto.py 
b/superset/db_engine_specs/presto.py
index 13e574d..345d5a4 100644
--- a/superset/db_engine_specs/presto.py
+++ b/superset/db_engine_specs/presto.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 # pylint: disable=C,R,W
-from collections import OrderedDict
+from collections import defaultdict, deque, OrderedDict
 from datetime import datetime
 from distutils.version import StrictVersion
 import logging
@@ -58,6 +58,51 @@ pandas_dtype_map = {
 }
 
 
+def get_children(column: Dict[str, str]) -> List[Dict[str, str]]:
+    """
+    Get the children of a complex Presto type (row or array).
+
+    For arrays, we return a single list with the base type:
+
+        >>> get_children(dict(name="a", type="ARRAY(BIGINT)"))
+        [{"name": "a", "type": "BIGINT"}]
+
+    For rows, we return a list of the columns:
+
+        >>> get_children(dict(name="a", type="ROW(BIGINT,FOO VARCHAR)"))
+        [{'name': 'a._col0', 'type': 'BIGINT'}, {'name': 'a.foo', 'type': 
'VARCHAR'}]
+
+    :param column: dictionary representing a Presto column
+    :return: list of dictionaries representing children columns
+    """
+    pattern = re.compile("(?P<type>\w+)\((?P<children>.*)\)")
+    match = pattern.match(column["type"])
+    if not match:
+        raise Exception(f"Unable to parse column type {column['type']}")
+
+    group = match.groupdict()
+    type_ = group["type"].upper()
+    children_type = group["children"]
+    if type_ == "ARRAY":
+        return [{"name": column["name"], "type": children_type}]
+    elif type_ == "ROW":
+        nameless_columns = 0
+        columns = []
+        for child in utils.split(children_type, ","):
+            parts = list(utils.split(child.strip(), " "))
+            if len(parts) == 2:
+                name, type_ = parts
+                name = name.strip('"')
+            else:
+                name = f"_col{nameless_columns}"
+                type_ = parts[0]
+                nameless_columns += 1
+            columns.append({"name": f"{column['name']}.{name.lower()}", 
"type": type_})
+        return columns
+    else:
+        raise Exception(f"Unknown type {type_}!")
+
+
 class PrestoEngineSpec(BaseEngineSpec):
     engine = "presto"
 
@@ -846,43 +891,79 @@ class PrestoEngineSpec(BaseEngineSpec):
         if not is_feature_enabled("PRESTO_EXPAND_DATA"):
             return columns, data, []
 
+        # insert a custom column that tracks the original row
+        columns.insert(0, {"name": "__row_id", "type": "BIGINT"})
+        for i, row in enumerate(data):
+            row["__row_id"] = i
+
+        # process each column, unnesting ARRAY types and expanding ROW types 
into new columns
+        to_process = deque((column, 0) for column in columns)
         all_columns: List[dict] = []
-        # Get the list of all columns (selected fields and their nested fields)
-        for column in columns:
-            if column["type"].startswith("ARRAY") or 
column["type"].startswith("ROW"):
-                cls._parse_structural_column(
-                    column["name"], column["type"].lower(), all_columns
-                )
-            else:
+        expanded_columns = []
+        current_array_level = None
+        while to_process:
+            column, level = to_process.popleft()
+            if column["name"] not in [column["name"] for column in 
all_columns]:
                 all_columns.append(column)
 
-        # Build graphs where the root node is a row or array and its children 
are that
-        # column's nested fields
-        row_column_hierarchy, array_column_hierarchy, expanded_columns = 
cls._create_row_and_array_hierarchy(
-            columns
-        )
-
-        # Pull out a row's nested fields and their values into separate columns
-        ordered_row_columns = row_column_hierarchy.keys()
-        for datum in data:
-            for row_column in ordered_row_columns:
-                cls._expand_row_data(datum, row_column, row_column_hierarchy)
-
-        while array_column_hierarchy:
-            array_columns = list(array_column_hierarchy.keys())
-            # Determine what columns are ready to be processed.
-            array_columns_to_process, unprocessed_array_columns = 
cls._split_array_columns_by_process_state(
-                array_columns, array_column_hierarchy, data[0]
-            )
-            all_array_data = cls._process_array_data(
-                data, all_columns, array_column_hierarchy
-            )
-            # Consolidate the original data set and the expanded array data
-            cls._consolidate_array_data_into_data(data, all_array_data)
-            # Remove processed array columns from the graph
-            cls._remove_processed_array_columns(
-                unprocessed_array_columns, array_column_hierarchy
-            )
+            # When unnesting arrays we need to keep track of how many extra 
rows
+            # were added, for each original row. This is necessary when we 
expand multiple
+            # arrays, so that the arrays after the first reuse the rows added 
by
+            # the first. every time we change a level in the nested arrays we
+            # reinitialize this.
+            if level != current_array_level:
+                unnested_rows: Dict[int, int] = defaultdict(int)
+                current_array_level = level
+
+            name = column["name"]
+
+            if column["type"].startswith("ARRAY("):
+                # keep processing array children; we append to the right so 
that
+                # multiple nested arrays are processed breadth-first
+                to_process.append((get_children(column)[0], level + 1))
+
+                # unnest array objects data into new rows
+                i = 0
+                while i < len(data):
+                    row = data[i]
+                    values = row.get(name)
+                    if values:
+                        # how many extra rows we need to unnest the data?
+                        extra_rows = len(values) - 1
+
+                        # how many rows were already added for this row?
+                        current_unnested_rows = unnested_rows[i]
+
+                        # add any necessary rows
+                        missing = extra_rows - current_unnested_rows
+                        for _ in range(missing):
+                            data.insert(i + current_unnested_rows + 1, {})
+                            unnested_rows[i] += 1
+
+                        # unnest array into rows
+                        for j, value in enumerate(values):
+                            data[i + j][name] = value
+
+                        # skip newly unnested rows
+                        i += unnested_rows[i]
+
+                    i += 1
+
+            if column["type"].startswith("ROW("):
+                # expand columns; we append them to the left so they are added
+                # immediately after the parent
+                expanded = get_children(column)
+                to_process.extendleft((column, level) for column in expanded)
+                expanded_columns.extend(expanded)
+
+                # expand row objects into new columns
+                for row in data:
+                    for value, col in zip(row.get(name) or [], expanded):
+                        row[col["name"]] = value
+
+        data = [
+            {k["name"]: row.get(k["name"], "") for k in all_columns} for row 
in data
+        ]
 
         return all_columns, data, expanded_columns
 
diff --git a/superset/utils/core.py b/superset/utils/core.py
index 55ada27..7a74eb4 100644
--- a/superset/utils/core.py
+++ b/superset/utils/core.py
@@ -32,7 +32,7 @@ import signal
 import smtplib
 from time import struct_time
 import traceback
-from typing import List, NamedTuple, Optional, Tuple, Union
+from typing import Iterator, List, NamedTuple, Optional, Tuple, Union
 from urllib.parse import unquote_plus
 import uuid
 import zlib
@@ -1194,3 +1194,35 @@ class DatasourceName(NamedTuple):
 def get_stacktrace():
     if current_app.config.get("SHOW_STACKTRACE"):
         return traceback.format_exc()
+
+
+def split(
+    s: str, delimiter: str = " ", quote: str = '"', escaped_quote: str = r"\""
+) -> Iterator[str]:
+    """
+    A split function that is aware of quotes and parentheses.
+
+    :param s: string to split
+    :param delimiter: string defining where to split, usually a comma or space
+    :param quote: string, either a single or a double quote
+    :param escaped_quote: string representing an escaped quote
+    :return: list of strings
+    """
+    parens = 0
+    quotes = False
+    i = 0
+    for j, c in enumerate(s):
+        complete = parens == 0 and not quotes
+        if complete and c == delimiter:
+            yield s[i:j]
+            i = j + len(delimiter)
+        elif c == "(":
+            parens += 1
+        elif c == ")":
+            parens -= 1
+        elif c == quote:
+            if quotes and s[j - len(escaped_quote) + 1 : j + 1] != 
escaped_quote:
+                quotes = False
+            elif not quotes:
+                quotes = True
+    yield s[i:]
diff --git a/tests/db_engine_specs_test.py b/tests/db_engine_specs_test.py
index a6db8a1..2c6e3b3 100644
--- a/tests/db_engine_specs_test.py
+++ b/tests/db_engine_specs_test.py
@@ -653,18 +653,51 @@ class DbEngineSpecsTestCase(SupersetTestCase):
             cols, data
         )
         expected_cols = [
-            {"name": "row_column", "type": "ROW"},
+            {"name": "__row_id", "type": "BIGINT"},
+            {"name": "row_column", "type": "ROW(NESTED_OBJ VARCHAR)"},
             {"name": "row_column.nested_obj", "type": "VARCHAR"},
-            {"name": "array_column", "type": "ARRAY"},
+            {"name": "array_column", "type": "ARRAY(BIGINT)"},
         ]
+
         expected_data = [
-            {"row_column": ["a"], "row_column.nested_obj": "a", 
"array_column": 1},
-            {"row_column": "", "row_column.nested_obj": "", "array_column": 2},
-            {"row_column": "", "row_column.nested_obj": "", "array_column": 3},
-            {"row_column": ["b"], "row_column.nested_obj": "b", 
"array_column": 4},
-            {"row_column": "", "row_column.nested_obj": "", "array_column": 5},
-            {"row_column": "", "row_column.nested_obj": "", "array_column": 6},
+            {
+                "__row_id": 0,
+                "array_column": 1,
+                "row_column": ["a"],
+                "row_column.nested_obj": "a",
+            },
+            {
+                "__row_id": "",
+                "array_column": 2,
+                "row_column": "",
+                "row_column.nested_obj": "",
+            },
+            {
+                "__row_id": "",
+                "array_column": 3,
+                "row_column": "",
+                "row_column.nested_obj": "",
+            },
+            {
+                "__row_id": 1,
+                "array_column": 4,
+                "row_column": ["b"],
+                "row_column.nested_obj": "b",
+            },
+            {
+                "__row_id": "",
+                "array_column": 5,
+                "row_column": "",
+                "row_column.nested_obj": "",
+            },
+            {
+                "__row_id": "",
+                "array_column": 6,
+                "row_column": "",
+                "row_column.nested_obj": "",
+            },
         ]
+
         expected_expanded_cols = [{"name": "row_column.nested_obj", "type": 
"VARCHAR"}]
         self.assertEqual(actual_cols, expected_cols)
         self.assertEqual(actual_data, expected_data)
@@ -677,7 +710,7 @@ class DbEngineSpecsTestCase(SupersetTestCase):
         cols = [
             {
                 "name": "row_column",
-                "type": "ROW(NESTED_OBJ1 VARCHAR, NESTED_ROW ROW(NESTED_OBJ2 
VARCHAR)",
+                "type": "ROW(NESTED_OBJ1 VARCHAR, NESTED_ROW ROW(NESTED_OBJ2 
VARCHAR))",
             }
         ]
         data = [{"row_column": ["a1", ["a2"]]}, {"row_column": ["b1", ["b2"]]}]
@@ -685,28 +718,35 @@ class DbEngineSpecsTestCase(SupersetTestCase):
             cols, data
         )
         expected_cols = [
-            {"name": "row_column", "type": "ROW"},
-            {"name": "row_column.nested_obj1", "type": "VARCHAR"},
-            {"name": "row_column.nested_row", "type": "ROW"},
+            {"name": "__row_id", "type": "BIGINT"},
+            {
+                "name": "row_column",
+                "type": "ROW(NESTED_OBJ1 VARCHAR, NESTED_ROW ROW(NESTED_OBJ2 
VARCHAR))",
+            },
+            {"name": "row_column.nested_row", "type": "ROW(NESTED_OBJ2 
VARCHAR)"},
             {"name": "row_column.nested_row.nested_obj2", "type": "VARCHAR"},
+            {"name": "row_column.nested_obj1", "type": "VARCHAR"},
         ]
         expected_data = [
             {
+                "__row_id": 0,
                 "row_column": ["a1", ["a2"]],
                 "row_column.nested_obj1": "a1",
                 "row_column.nested_row": ["a2"],
                 "row_column.nested_row.nested_obj2": "a2",
             },
             {
+                "__row_id": 1,
                 "row_column": ["b1", ["b2"]],
                 "row_column.nested_obj1": "b1",
                 "row_column.nested_row": ["b2"],
                 "row_column.nested_row.nested_obj2": "b2",
             },
         ]
+
         expected_expanded_cols = [
             {"name": "row_column.nested_obj1", "type": "VARCHAR"},
-            {"name": "row_column.nested_row", "type": "ROW"},
+            {"name": "row_column.nested_row", "type": "ROW(NESTED_OBJ2 
VARCHAR)"},
             {"name": "row_column.nested_row.nested_obj2", "type": "VARCHAR"},
         ]
         self.assertEqual(actual_cols, expected_cols)
@@ -732,63 +772,81 @@ class DbEngineSpecsTestCase(SupersetTestCase):
             cols, data
         )
         expected_cols = [
+            {"name": "__row_id", "type": "BIGINT"},
             {"name": "int_column", "type": "BIGINT"},
-            {"name": "array_column", "type": "ARRAY"},
-            {"name": "array_column.nested_array", "type": "ARRAY"},
+            {
+                "name": "array_column",
+                "type": "ARRAY(ROW(NESTED_ARRAY ARRAY(ROW(NESTED_OBJ 
VARCHAR))))",
+            },
+            {
+                "name": "array_column.nested_array",
+                "type": "ARRAY(ROW(NESTED_OBJ VARCHAR))",
+            },
             {"name": "array_column.nested_array.nested_obj", "type": 
"VARCHAR"},
         ]
         expected_data = [
             {
-                "int_column": 1,
-                "array_column": [[[["a"], ["b"]]], [[["c"], ["d"]]]],
-                "array_column.nested_array": [["a"], ["b"]],
+                "__row_id": 0,
+                "array_column": [[["a"], ["b"]]],
+                "array_column.nested_array": ["a"],
                 "array_column.nested_array.nested_obj": "a",
+                "int_column": 1,
             },
             {
-                "int_column": "",
+                "__row_id": "",
                 "array_column": "",
-                "array_column.nested_array": "",
+                "array_column.nested_array": ["b"],
                 "array_column.nested_array.nested_obj": "b",
+                "int_column": "",
             },
             {
-                "int_column": "",
-                "array_column": "",
-                "array_column.nested_array": [["c"], ["d"]],
+                "__row_id": "",
+                "array_column": [[["c"], ["d"]]],
+                "array_column.nested_array": ["c"],
                 "array_column.nested_array.nested_obj": "c",
+                "int_column": "",
             },
             {
-                "int_column": "",
+                "__row_id": "",
                 "array_column": "",
-                "array_column.nested_array": "",
+                "array_column.nested_array": ["d"],
                 "array_column.nested_array.nested_obj": "d",
+                "int_column": "",
             },
             {
-                "int_column": 2,
-                "array_column": [[[["e"], ["f"]]], [[["g"], ["h"]]]],
-                "array_column.nested_array": [["e"], ["f"]],
+                "__row_id": 1,
+                "array_column": [[["e"], ["f"]]],
+                "array_column.nested_array": ["e"],
                 "array_column.nested_array.nested_obj": "e",
+                "int_column": 2,
             },
             {
-                "int_column": "",
+                "__row_id": "",
                 "array_column": "",
-                "array_column.nested_array": "",
+                "array_column.nested_array": ["f"],
                 "array_column.nested_array.nested_obj": "f",
+                "int_column": "",
             },
             {
-                "int_column": "",
-                "array_column": "",
-                "array_column.nested_array": [["g"], ["h"]],
+                "__row_id": "",
+                "array_column": [[["g"], ["h"]]],
+                "array_column.nested_array": ["g"],
                 "array_column.nested_array.nested_obj": "g",
+                "int_column": "",
             },
             {
-                "int_column": "",
+                "__row_id": "",
                 "array_column": "",
-                "array_column.nested_array": "",
+                "array_column.nested_array": ["h"],
                 "array_column.nested_array.nested_obj": "h",
+                "int_column": "",
             },
         ]
         expected_expanded_cols = [
-            {"name": "array_column.nested_array", "type": "ARRAY"},
+            {
+                "name": "array_column.nested_array",
+                "type": "ARRAY(ROW(NESTED_OBJ VARCHAR))",
+            },
             {"name": "array_column.nested_array.nested_obj", "type": 
"VARCHAR"},
         ]
         self.assertEqual(actual_cols, expected_cols)
diff --git a/tests/utils_tests.py b/tests/utils_tests.py
index df11cf5..5efd44d 100644
--- a/tests/utils_tests.py
+++ b/tests/utils_tests.py
@@ -45,6 +45,7 @@ from superset.utils.core import (
     parse_js_uri_path_item,
     parse_past_timedelta,
     setup_cache,
+    split,
     validate_json,
     zlib_compress,
     zlib_decompress,
@@ -832,6 +833,20 @@ class UtilsTestCase(unittest.TestCase):
                 stacktrace = get_stacktrace()
                 assert stacktrace is None
 
+    def test_split(self):
+        self.assertEqual(list(split("a b")), ["a", "b"])
+        self.assertEqual(list(split("a,b", delimiter=",")), ["a", "b"])
+        self.assertEqual(list(split("a,(b,a)", delimiter=",")), ["a", "(b,a)"])
+        self.assertEqual(
+            list(split('a,(b,a),"foo , bar"', delimiter=",")),
+            ["a", "(b,a)", '"foo , bar"'],
+        )
+        self.assertEqual(
+            list(split("a,'b,c'", delimiter=",", quote="'")), ["a", "'b,c'"]
+        )
+        self.assertEqual(list(split('a "b c"')), ["a", '"b c"'])
+        self.assertEqual(list(split(r'a "b \" c"')), ["a", r'"b \" c"'])
+
     def test_get_or_create_db(self):
         get_or_create_db("test_db", "sqlite:///superset.db")
         database = 
db.session.query(Database).filter_by(database_name="test_db").one()

Reply via email to