Repository: spark
Updated Branches:
  refs/heads/master e35ad3cad -> 4e861db5f


[SPARK-16406][SQL] Improve performance of LogicalPlan.resolve

## What changes were proposed in this pull request?

`LogicalPlan.resolve(...)` uses linear searches to find an attribute matching a 
name. This is fine in normal cases, but gets problematic when you try to 
resolve a large number of columns on a plan with a large number of attributes.

This PR adds an indexing structure to `resolve(...)` in order to find potential 
matches quicker. This PR improves the reference resolution time for the 
following code by 4x (11.8s -> 2.4s):

``` scala
val n = 4000
val values = (1 to n).map(_.toString).mkString(", ")
val columns = (1 to n).map("column" + _).mkString(", ")
val query =
  s"""
     |SELECT $columns
     |FROM VALUES ($values) T($columns)
     |WHERE 1=2 AND 1 IN ($columns)
     |GROUP BY $columns
     |ORDER BY $columns
     |""".stripMargin

spark.time(sql(query))
```
## How was this patch tested?

Existing tests.

Author: Herman van Hovell <[email protected]>

Closes #14083 from hvanhovell/SPARK-16406.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e861db5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e861db5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e861db5

Branch: refs/heads/master
Commit: 4e861db5f149e10fd8dfe6b3c1484821a590b1e8
Parents: e35ad3c
Author: Herman van Hovell <[email protected]>
Authored: Mon May 7 11:21:22 2018 +0200
Committer: Herman van Hovell <[email protected]>
Committed: Mon May 7 11:21:22 2018 +0200

----------------------------------------------------------------------
 .../sql/catalyst/expressions/package.scala      |  86 +++++++++++++++
 .../catalyst/plans/logical/LogicalPlan.scala    | 108 ++-----------------
 2 files changed, 93 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4e861db5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index 1a48995..8a06daa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -17,8 +17,12 @@
 
 package org.apache.spark.sql.catalyst
 
+import java.util.Locale
+
 import com.google.common.collect.Maps
 
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.types.{StructField, StructType}
 
@@ -138,6 +142,88 @@ package object expressions  {
     def indexOf(exprId: ExprId): Int = {
       Option(exprIdToOrdinal.get(exprId)).getOrElse(-1)
     }
+
+    private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = 
{
+      m.mapValues(_.distinct).map(identity)
+    }
+
+    /** Map to use for direct case insensitive attribute lookups. */
+    @transient private lazy val direct: Map[String, Seq[Attribute]] = {
+      unique(attrs.groupBy(_.name.toLowerCase(Locale.ROOT)))
+    }
+
+    /** Map to use for qualified case insensitive attribute lookups. */
+    @transient private val qualified: Map[(String, String), Seq[Attribute]] = {
+      val grouped = attrs.filter(_.qualifier.isDefined).groupBy { a =>
+        (a.qualifier.get.toLowerCase(Locale.ROOT), 
a.name.toLowerCase(Locale.ROOT))
+      }
+      unique(grouped)
+    }
+
+    /** Perform attribute resolution given a name and a resolver. */
+    def resolve(nameParts: Seq[String], resolver: Resolver): 
Option[NamedExpression] = {
+      // Collect matching attributes given a name and a lookup.
+      def collectMatches(name: String, candidates: Option[Seq[Attribute]]): 
Seq[Attribute] = {
+        candidates.toSeq.flatMap(_.collect {
+          case a if resolver(a.name, name) => a.withName(name)
+        })
+      }
+
+      // Find matches for the given name assuming that the 1st part is a 
qualifier (i.e. table name,
+      // alias, or subquery alias) and the 2nd part is the actual name. This 
returns a tuple of
+      // matched attributes and a list of parts that are to be resolved.
+      //
+      // For example, consider an example where "a" is the table name, "b" is 
the column name,
+      // and "c" is the struct field name, i.e. "a.b.c". In this case, 
Attribute will be "a.b",
+      // and the second element will be List("c").
+      val matches = nameParts match {
+        case qualifier +: name +: nestedFields =>
+          val key = (qualifier.toLowerCase(Locale.ROOT), 
name.toLowerCase(Locale.ROOT))
+          val attributes = collectMatches(name, qualified.get(key)).filter { a 
=>
+            resolver(qualifier, a.qualifier.get)
+          }
+          (attributes, nestedFields)
+        case all =>
+          (Nil, all)
+      }
+
+      // If none of attributes match `table.column` pattern, we try to resolve 
it as a column.
+      val (candidates, nestedFields) = matches match {
+        case (Seq(), _) =>
+          val name = nameParts.head
+          val attributes = collectMatches(name, 
direct.get(name.toLowerCase(Locale.ROOT)))
+          (attributes, nameParts.tail)
+        case _ => matches
+      }
+
+      def name = UnresolvedAttribute(nameParts).name
+      candidates match {
+        case Seq(a) if nestedFields.nonEmpty =>
+          // One match, but we also need to extract the requested nested field.
+          // The foldLeft adds ExtractValues for every remaining parts of the 
identifier,
+          // and aliased it with the last part of the name.
+          // For example, consider "a.b.c", where "a" is resolved to an 
existing attribute.
+          // Then this will add ExtractValue("c", ExtractValue("b", a)), and 
alias the final
+          // expression as "c".
+          val fieldExprs = nestedFields.foldLeft(a: Expression) { (e, name) =>
+            ExtractValue(e, Literal(name), resolver)
+          }
+          Some(Alias(fieldExprs, nestedFields.last)())
+
+        case Seq(a) =>
+          // One match, no nested fields, use it.
+          Some(a)
+
+        case Seq() =>
+          // No matches.
+          None
+
+        case ambiguousReferences =>
+          // More than one match.
+          val referenceNames = 
ambiguousReferences.map(_.qualifiedName).mkString(", ")
+          throw new AnalysisException(s"Reference '$name' is ambiguous, could 
be: $referenceNames.")
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/4e861db5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 4203440..e487693 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -86,6 +86,10 @@ abstract class LogicalPlan
     }
   }
 
+  private[this] lazy val childAttributes = 
AttributeSeq(children.flatMap(_.output))
+
+  private[this] lazy val outputAttributes = AttributeSeq(output)
+
   /**
    * Optionally resolves the given strings to a [[NamedExpression]] using the 
input from all child
    * nodes of this LogicalPlan. The attribute is expressed as
@@ -94,7 +98,7 @@ abstract class LogicalPlan
   def resolveChildren(
       nameParts: Seq[String],
       resolver: Resolver): Option[NamedExpression] =
-    resolve(nameParts, children.flatMap(_.output), resolver)
+    childAttributes.resolve(nameParts, resolver)
 
   /**
    * Optionally resolves the given strings to a [[NamedExpression]] based on 
the output of this
@@ -104,7 +108,7 @@ abstract class LogicalPlan
   def resolve(
       nameParts: Seq[String],
       resolver: Resolver): Option[NamedExpression] =
-    resolve(nameParts, output, resolver)
+    outputAttributes.resolve(nameParts, resolver)
 
   /**
    * Given an attribute name, split it to name parts by dot, but
@@ -114,105 +118,7 @@ abstract class LogicalPlan
   def resolveQuoted(
       name: String,
       resolver: Resolver): Option[NamedExpression] = {
-    resolve(UnresolvedAttribute.parseAttributeName(name), output, resolver)
-  }
-
-  /**
-   * Resolve the given `name` string against the given attribute, returning 
either 0 or 1 match.
-   *
-   * This assumes `name` has multiple parts, where the 1st part is a qualifier
-   * (i.e. table name, alias, or subquery alias).
-   * See the comment above `candidates` variable in resolve() for semantics 
the returned data.
-   */
-  private def resolveAsTableColumn(
-      nameParts: Seq[String],
-      resolver: Resolver,
-      attribute: Attribute): Option[(Attribute, List[String])] = {
-    assert(nameParts.length > 1)
-    if (attribute.qualifier.exists(resolver(_, nameParts.head))) {
-      // At least one qualifier matches. See if remaining parts match.
-      val remainingParts = nameParts.tail
-      resolveAsColumn(remainingParts, resolver, attribute)
-    } else {
-      None
-    }
-  }
-
-  /**
-   * Resolve the given `name` string against the given attribute, returning 
either 0 or 1 match.
-   *
-   * Different from resolveAsTableColumn, this assumes `name` does NOT start 
with a qualifier.
-   * See the comment above `candidates` variable in resolve() for semantics 
the returned data.
-   */
-  private def resolveAsColumn(
-      nameParts: Seq[String],
-      resolver: Resolver,
-      attribute: Attribute): Option[(Attribute, List[String])] = {
-    if (resolver(attribute.name, nameParts.head)) {
-      Option((attribute.withName(nameParts.head), nameParts.tail.toList))
-    } else {
-      None
-    }
-  }
-
-  /** Performs attribute resolution given a name and a sequence of possible 
attributes. */
-  protected def resolve(
-      nameParts: Seq[String],
-      input: Seq[Attribute],
-      resolver: Resolver): Option[NamedExpression] = {
-
-    // A sequence of possible candidate matches.
-    // Each candidate is a tuple. The first element is a resolved attribute, 
followed by a list
-    // of parts that are to be resolved.
-    // For example, consider an example where "a" is the table name, "b" is 
the column name,
-    // and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute 
will be "a.b",
-    // and the second element will be List("c").
-    var candidates: Seq[(Attribute, List[String])] = {
-      // If the name has 2 or more parts, try to resolve it as `table.column` 
first.
-      if (nameParts.length > 1) {
-        input.flatMap { option =>
-          resolveAsTableColumn(nameParts, resolver, option)
-        }
-      } else {
-        Seq.empty
-      }
-    }
-
-    // If none of attributes match `table.column` pattern, we try to resolve 
it as a column.
-    if (candidates.isEmpty) {
-      candidates = input.flatMap { candidate =>
-        resolveAsColumn(nameParts, resolver, candidate)
-      }
-    }
-
-    def name = UnresolvedAttribute(nameParts).name
-
-    candidates.distinct match {
-      // One match, no nested fields, use it.
-      case Seq((a, Nil)) => Some(a)
-
-      // One match, but we also need to extract the requested nested field.
-      case Seq((a, nestedFields)) =>
-        // The foldLeft adds ExtractValues for every remaining parts of the 
identifier,
-        // and aliased it with the last part of the name.
-        // For example, consider "a.b.c", where "a" is resolved to an existing 
attribute.
-        // Then this will add ExtractValue("c", ExtractValue("b", a)), and 
alias the final
-        // expression as "c".
-        val fieldExprs = nestedFields.foldLeft(a: Expression)((expr, 
fieldName) =>
-          ExtractValue(expr, Literal(fieldName), resolver))
-        Some(Alias(fieldExprs, nestedFields.last)())
-
-      // No matches.
-      case Seq() =>
-        logTrace(s"Could not find $name in ${input.mkString(", ")}")
-        None
-
-      // More than one match.
-      case ambiguousReferences =>
-        val referenceNames = 
ambiguousReferences.map(_._1.qualifiedName).mkString(", ")
-        throw new AnalysisException(
-          s"Reference '$name' is ambiguous, could be: $referenceNames.")
-    }
+    outputAttributes.resolve(UnresolvedAttribute.parseAttributeName(name), 
resolver)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to