This is an automated email from the ASF dual-hosted git repository. sk0x50 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
commit 91e74f28a3627404e2802afa0da24a5e364e611f Author: Ivan Gagarkin <[email protected]> AuthorDate: Tue Oct 4 18:54:14 2022 +0300 IGNITE-12519 Add support for lowercase object names. Fixes #178 Signed-off-by: Slava Koptilin <[email protected]> --- .../org/apache/ignite/spark/impl/QueryUtils.scala | 12 ++ .../impl/optimization/AggregateExpressions.scala | 2 +- .../impl/optimization/ConditionExpressions.scala | 2 +- .../spark/impl/optimization/DateExpressions.scala | 2 +- .../spark/impl/optimization/MathExpressions.scala | 2 +- .../impl/optimization/SimpleExpressions.scala | 19 +-- .../impl/optimization/StringExpressions.scala | 2 +- .../impl/optimization/SupportedExpressions.scala | 2 +- .../impl/optimization/SystemExpressions.scala | 2 +- .../accumulator/JoinSQLAccumulator.scala | 53 ++++---- .../accumulator/SingleTableSQLAccumulator.scala | 37 ++++-- .../accumulator/UnionSQLAccumulator.scala | 14 ++- .../impl/optimization/accumulator/package.scala | 31 +++++ .../ignite/spark/impl/optimization/package.scala | 10 +- .../ignite/spark/IgniteQueryCompilatorSpec.scala | 139 +++++++++++++++++++++ 15 files changed, 273 insertions(+), 56 deletions(-) diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/QueryUtils.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/QueryUtils.scala index 79aa523..8437b2f 100644 --- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/QueryUtils.scala +++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/QueryUtils.scala @@ -26,6 +26,18 @@ import org.apache.spark.sql.types._ * Utility class for building SQL queries. */ private[impl] object QueryUtils extends Logging { + /** Add quotes to provided string if needed. + * @param str String to be quoted. + * @param needed Boolean flag that indicates that the given string need to be quoted. + * @return result string. + */ + def quoteStringIfNeeded(str: String, needed: Boolean): String = { + if (needed) + "\"" + str + "\"" + else + str + } + /** * Builds `where` part of SQL query. * diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala index 3e6b6b5..51f2fee 100644 --- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala +++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala @@ -51,7 +51,7 @@ private[optimization] object AggregateExpressions extends SupportedExpressions { /** @inheritdoc */ override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean, - useAlias: Boolean): Option[String] = expr match { + useAlias: Boolean, caseSensitive:Boolean): Option[String] = expr match { case AggregateExpression(aggregateFunction, _, isDistinct, _, _) ⇒ aggregateFunction match { case Count(children) ⇒ diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala index fbfbd64..6db2409 100644 --- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala +++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala @@ -78,7 +78,7 @@ private[optimization] object ConditionExpressions extends SupportedExpressions { /** @inheritdoc */ override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean, - useAlias: Boolean): Option[String] = expr match { + useAlias: Boolean, caseSensitive: Boolean): Option[String] = expr match { case EqualTo(left, right) ⇒ Some(s"${childToString(left)} = ${childToString(right)}") diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala index d075bf0..156d4fa 100644 --- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala +++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala @@ -73,7 +73,7 @@ private[optimization] object DateExpressions extends SupportedExpressions { /** @inheritdoc */ override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean, - useAlias: Boolean): Option[String] = expr match { + useAlias: Boolean, caseSensitive: Boolean): Option[String] = expr match { case CurrentDate(_) ⇒ Some(s"CURRENT_DATE()") diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala index 256cd78..99386ac 100644 --- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala +++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala @@ -138,7 +138,7 @@ private[optimization] object MathExpressions extends SupportedExpressions { /** @inheritdoc */ override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean, - useAlias: Boolean): Option[String] = expr match { + useAlias: Boolean, caseSensitive: Boolean): Option[String] = expr match { case Abs(child, _) ⇒ Some(s"ABS(${childToString(child)})") diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala index 37cb9e1..c32ecf5 100644 --- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala +++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala @@ -17,13 +17,14 @@ package org.apache.ignite.spark.impl.optimization +import org.apache.ignite.spark.impl.QueryUtils.quoteStringIfNeeded + import java.text.SimpleDateFormat import org.apache.spark.sql.catalyst.expressions.{Expression, _} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import java.time.ZoneOffset -import java.util.TimeZone /** * Object to support some 'simple' expressions like aliases. @@ -49,7 +50,7 @@ private[optimization] object SimpleExpressions extends SupportedExpressions { /** @inheritdoc */ override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean, - useAlias: Boolean): Option[String] = expr match { + useAlias: Boolean, caseSensitive: Boolean): Option[String] = expr match { case l: Literal ⇒ if (l.value == null) Some("null") @@ -75,8 +76,8 @@ private[optimization] object SimpleExpressions extends SupportedExpressions { //Internal representation of DateType is Int. //So we converting from internal spark representation to CAST call. case days: Integer ⇒ - val date = new java.util.Date(DateTimeUtils.microsToMillis(DateTimeUtils.daysToMicros(days, ZoneOffset - .UTC))) // FIXME: default id + val date = new java.util.Date(DateTimeUtils.microsToMillis( + DateTimeUtils.daysToMicros(days, ZoneOffset.UTC))) // FIXME: default id Some(s"CAST('${dateFormat.get.format(date)}' AS DATE)") @@ -91,11 +92,13 @@ private[optimization] object SimpleExpressions extends SupportedExpressions { case ar: AttributeReference ⇒ val name = if (useQualifier) - // TODO: add ticket to handle seq with two elements with qualifier for database name: related to the [SPARK-19602][SQL] ticket - ar.qualifier.map(_ + "." + ar.name).find(_ => true).getOrElse(ar.name) + // TODO: add ticket to handle seq with two elements with qualifier for database name: related to the [SPARK-19602][SQL] ticket + ar.qualifier + .map(quoteStringIfNeeded(_, caseSensitive)) + .map(_ + "." + quoteStringIfNeeded(ar.name, caseSensitive)) + .find(_ => true).getOrElse(ar.name) else - ar.name - + quoteStringIfNeeded(ar.name, caseSensitive) if (ar.metadata.contains(ALIAS) && !isAliasEqualColumnName(ar.metadata.getString(ALIAS), ar.name) && useAlias) { diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala index 733fe80..af10282 100644 --- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala +++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala @@ -96,7 +96,7 @@ private[optimization] object StringExpressions extends SupportedExpressions { /** @inheritdoc */ override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean, - useAlias: Boolean): Option[String] = expr match { + useAlias: Boolean, caseSensitive: Boolean): Option[String] = expr match { case Ascii(child) ⇒ Some(s"ASCII(${childToString(child)})") diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala index f46eb72..3926889 100644 --- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala +++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala @@ -38,5 +38,5 @@ private[optimization] trait SupportedExpressions { * @return SQL representation of `expr` if it supported. `None` otherwise. */ def toString(expr: Expression, childToString: (Expression) ⇒ String, useQualifier: Boolean, - useAlias: Boolean): Option[String] + useAlias: Boolean, caseSensitive: Boolean): Option[String] } diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala index 40e4e29..66cfc71 100644 --- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala +++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala @@ -71,7 +71,7 @@ private[optimization] object SystemExpressions extends SupportedExpressions { /** @inheritdoc */ override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean, - useAlias: Boolean): Option[String] = expr match { + useAlias: Boolean, caseSensitive: Boolean): Option[String] = expr match { case Coalesce(children) ⇒ Some(s"COALESCE(${children.map(childToString(_)).mkString(", ")})") diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala index 05e5aeb..dd7804e 100644 --- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala +++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala @@ -18,14 +18,15 @@ package org.apache.ignite.spark.impl.optimization.accumulator import org.apache.ignite.IgniteException +import org.apache.ignite.spark.impl.QueryUtils.quoteStringIfNeeded import org.apache.ignite.spark.impl.optimization._ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{BinaryNode, LogicalPlan} import org.apache.spark.sql.catalyst.plans.{Inner, JoinType, LeftOuter, RightOuter} /** - * Accumulator to store information about join query. - */ + * Accumulator to store information about join query. + */ private[apache] case class JoinSQLAccumulator( igniteQueryContext: IgniteQueryContext, left: QueryAccumulator, @@ -47,29 +48,35 @@ private[apache] case class JoinSQLAccumulator( override def compileQuery(prettyPrint: Boolean = false, nestedQuery: Boolean = false): String = { val delim = if (prettyPrint) "\n" else " " val tab = if (prettyPrint) " " else "" + val caseSensitiveEnabled = isCaseSensitiveEnabled(igniteQueryContext) var sql = s"SELECT$delim$tab" + - s"${fixQualifier(outputExpressions).map(exprToString(_, useQualifier = true)).mkString(", ")}$delim" + + s"${fixQualifier(outputExpressions) + .map(exprToString(_, useQualifier = true, caseSensitive = caseSensitiveEnabled)).mkString(", ")}$delim" + s"FROM$delim$tab$compileJoinExpr" if (allFilters.nonEmpty) sql += s"${delim}WHERE$delim$tab" + - s"${fixQualifier(allFilters).map(exprToString(_, useQualifier = true)).mkString(s" AND$delim$tab")}" + s"${fixQualifier(allFilters) + .map(exprToString(_, useQualifier = true, caseSensitive = caseSensitiveEnabled)).mkString(s" AND$delim$tab")}" if (groupBy.exists(_.nonEmpty)) sql += s"${delim}GROUP BY " + - s"${fixQualifier(groupBy.get).map(exprToString(_, useQualifier = true)).mkString(s",$delim$tab")}" + s"${fixQualifier(groupBy.get) + .map(exprToString(_, useQualifier = true, caseSensitive = caseSensitiveEnabled)).mkString(s",$delim$tab")}" if (having.exists(_.nonEmpty)) sql += s"${delim}HAVING " + - s"${fixQualifier(having.get).map(exprToString(_, useQualifier = true)).mkString(s" AND$delim$tab")}" + s"${fixQualifier(having.get) + .map(exprToString(_, useQualifier = true, caseSensitive = caseSensitiveEnabled)).mkString(s" AND$delim$tab")}" if (orderBy.exists(_.nonEmpty)) sql += s"${delim}ORDER BY " + - s"${fixQualifier(orderBy.get).map(exprToString(_, useQualifier = true)).mkString(s",$delim$tab")}" + s"${fixQualifier(orderBy.get) + .map(exprToString(_, useQualifier = true, caseSensitive = caseSensitiveEnabled)).mkString(s",$delim$tab")}" if (limit.isDefined) { - sql += s" LIMIT ${exprToString(fixQualifier0(limit.get), useQualifier = true)}" + sql += s" LIMIT ${exprToString(fixQualifier0(limit.get), useQualifier = true, caseSensitive = caseSensitiveEnabled)}" if (nestedQuery) sql = s"SELECT * FROM ($sql)" @@ -79,8 +86,8 @@ private[apache] case class JoinSQLAccumulator( } /** - * @return Filters for this query. - */ + * @return Filters for this query. + */ private def allFilters: Seq[Expression] = { val leftFilters = if (isSimpleTableAcc(left)) @@ -97,24 +104,27 @@ private[apache] case class JoinSQLAccumulator( } /** - * @return `table1 LEFT JOIN (SELECT....FROM...) table2` part of join query. - */ + * @return `table1 LEFT JOIN (SELECT....FROM...) table2` part of join query. + */ private def compileJoinExpr: String = { + val caseSensitiveEnabled = isCaseSensitiveEnabled(igniteQueryContext) + val leftJoinSql = if (isSimpleTableAcc(left)) - left.asInstanceOf[SingleTableSQLAccumulator].table.get + quoteStringIfNeeded(left.asInstanceOf[SingleTableSQLAccumulator].table.get, caseSensitiveEnabled) else s"(${left.compileQuery()}) ${leftAlias.get}" val rightJoinSql = { val leftTableName = if (isSimpleTableAcc(left)) - left.qualifier + quoteStringIfNeeded(left.qualifier, caseSensitiveEnabled) else leftAlias.get if (isSimpleTableAcc(right)) { - val rightTableName = right.asInstanceOf[SingleTableSQLAccumulator].table.get + val rightTableName = + quoteStringIfNeeded(right.asInstanceOf[SingleTableSQLAccumulator].table.get, caseSensitiveEnabled) if (leftTableName == rightTableName) s"$rightTableName as ${rightAlias.get}" @@ -125,7 +135,8 @@ private[apache] case class JoinSQLAccumulator( } s"$leftJoinSql $joinTypeSQL $rightJoinSql" + - s"${condition.map(expr ⇒ s" ON ${exprToString(fixQualifier0(expr), useQualifier = true)}").getOrElse("")}" + s"${condition.map(expr ⇒ s" ON ${exprToString(fixQualifier0(expr), + useQualifier = true, caseSensitive = caseSensitiveEnabled)}").getOrElse("")}" } /** @@ -170,11 +181,11 @@ private[apache] case class JoinSQLAccumulator( } /** - * Find right qualifier for a `attr`. - * - * @param attr Attribute to fix qualifier in - * @return Right qualifier for a `attr` - */ + * Find right qualifier for a `attr`. + * + * @param attr Attribute to fix qualifier in + * @return Right qualifier for a `attr` + */ private def findQualifier(attr: AttributeReference): String = { val leftTableName = if (isSimpleTableAcc(left)) diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala index 2f56d9e..129e08b 100644 --- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala +++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala @@ -23,10 +23,10 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedEx import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan /** - * Class for accumulating parts of SQL query to a single Ignite table. - * - * See <a href="http://www.h2database.com/html/grammar.html#select">select syntax of H2</a>. - */ + * Class for accumulating parts of SQL query to a single Ignite table. + * + * See <a href="http://www.h2database.com/html/grammar.html#select">select syntax of H2</a>. + */ private[apache] case class SingleTableSQLAccumulator( igniteQueryContext: IgniteQueryContext, table: Option[String], @@ -45,24 +45,30 @@ private[apache] case class SingleTableSQLAccumulator( override def compileQuery(prettyPrint: Boolean = false, nestedQuery: Boolean = false): String = { val delim = if (prettyPrint) "\n" else " " val tab = if (prettyPrint) " " else "" + val caseSensitiveEnabled = isCaseSensitiveEnabled(igniteQueryContext) - var sql = s"SELECT$delim$tab${outputExpressions.map(exprToString(_)).mkString(", ")}${delim}" + + var sql = s"SELECT$delim$tab${outputExpressions + .map(exprToString(_, caseSensitive = caseSensitiveEnabled)).mkString(", ")}${delim}" + s"FROM$delim$tab$compiledTableExpression" if (where.exists(_.nonEmpty)) - sql += s"${delim}WHERE$delim$tab${where.get.map(exprToString(_)).mkString(s" AND$delim$tab")}" + sql += s"${delim}WHERE$delim$tab${where.get + .map(exprToString(_, caseSensitive = caseSensitiveEnabled)).mkString(s" AND$delim$tab")}" if (groupBy.exists(_.nonEmpty)) - sql += s"${delim}GROUP BY ${groupBy.get.map(exprToString(_)).mkString(s",$delim$tab")}" + sql += s"${delim}GROUP BY ${groupBy.get + .map(exprToString(_, caseSensitive = caseSensitiveEnabled)).mkString(s",$delim$tab")}" if (having.exists(_.nonEmpty)) - sql += s"${delim}HAVING ${having.get.map(exprToString(_)).mkString(s" AND$delim$tab")}" + sql += s"${delim}HAVING ${having.get + .map(exprToString(_, caseSensitive = caseSensitiveEnabled)).mkString(s" AND$delim$tab")}" if (orderBy.exists(_.nonEmpty)) - sql += s"${delim}ORDER BY ${orderBy.get.map(exprToString(_)).mkString(s",$delim$tab")}" + sql += s"${delim}ORDER BY ${orderBy.get + .map(exprToString(_,caseSensitive = caseSensitiveEnabled)).mkString(s",$delim$tab")}" if (limit.isDefined) { - sql += s" LIMIT ${limit.map(exprToString(_)).get}" + sql += s" LIMIT ${limit.map(exprToString(_, caseSensitive = caseSensitiveEnabled)).get}" if (nestedQuery) sql = s"SELECT * FROM ($sql)" @@ -72,11 +78,16 @@ private[apache] case class SingleTableSQLAccumulator( } /** - * @return From table SQL query part. - */ + * @return From table SQL query part. + */ private def compiledTableExpression: String = table match { case Some(tableName) ⇒ - tableName + val caseSens = igniteQueryContext.sqlContext + .getConf("spark.sql.caseSensitive", "false").toBoolean + if (caseSens) + "\"" + tableName + "\"" + else + tableName case None ⇒ tableExpression match { case Some((acc, alias)) ⇒ diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala index 29bfcda..d49828f 100644 --- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala +++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala @@ -36,18 +36,20 @@ private[apache] case class UnionSQLAccumulator( override def compileQuery(prettyPrint: Boolean = false, nestedQuery: Boolean = false): String = { val delim = if (prettyPrint) "\n" else " " val tab = if (prettyPrint) " " else "" + val caseSensitiveEnabled = isCaseSensitiveEnabled(igniteQueryContext) var query = children.map(_.compileQuery(prettyPrint, nestedQuery = true)).mkString(s"${delim}UNION$delim") query = orderBy match { case Some(sortOrders) ⇒ - query + s"${delim}ORDER BY ${sortOrders.map(exprToString(_)).mkString(s",$delim$tab")}" + query + s"${delim}ORDER BY ${sortOrders + .map(exprToString(_, caseSensitive = caseSensitiveEnabled)).mkString(s",$delim$tab")}" case None ⇒ query } if (limit.isDefined) { - query += s" LIMIT ${exprToString(limit.get)}" + query += s" LIMIT ${exprToString(limit.get, caseSensitive = caseSensitiveEnabled)}" if (nestedQuery) query = s"SELECT * FROM ($query)" @@ -57,8 +59,12 @@ private[apache] case class UnionSQLAccumulator( } /** @inheritdoc */ - override def simpleString(maxFields: Int): String = - s"UnionSQLAccumulator(orderBy: ${orderBy.map(_.map(exprToString(_)).mkString(", ")).getOrElse("[]")})" + override def simpleString(maxFields: Int): String = { + val caseSensitiveEnabled = isCaseSensitiveEnabled(igniteQueryContext) + + s"UnionSQLAccumulator(orderBy: ${orderBy.map(_.map(exprToString(_, caseSensitive = caseSensitiveEnabled)) + .mkString(", ")).getOrElse("[]")})" + } /** @inheritdoc */ override def withOutputExpressions(outputExpressions: Seq[NamedExpression]): QueryAccumulator = diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/package.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/package.scala new file mode 100644 index 0000000..5416b80 --- /dev/null +++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/package.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl.optimization + +package object accumulator { + + /** + * Read spark context and return value of "spark.sql.caseSensitive" property + * @param igniteQueryContext: IgniteQueryContext + * @return value of "spark.sql.caseSensitive" config property + */ + def isCaseSensitiveEnabled(igniteQueryContext: IgniteQueryContext): Boolean = { + igniteQueryContext.sqlContext + .getConf("spark.sql.caseSensitive", "false").toBoolean + } +} diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala index 5526cad..3c221a8 100644 --- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala +++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala @@ -53,15 +53,19 @@ package object optimization { * @param useAlias If true outputs `expr` with alias. * @return String representation of expression. */ - def exprToString(expr: Expression, useQualifier: Boolean = false, useAlias: Boolean = true): String = { + def exprToString(expr: Expression, + useQualifier: Boolean = false, + useAlias: Boolean = true, + caseSensitive: Boolean = false): String = { @tailrec def exprToString0(expr: Expression, supportedExpressions: List[SupportedExpressions]): Option[String] = if (supportedExpressions.nonEmpty) { val exprStr = supportedExpressions.head.toString( expr, - exprToString(_, useQualifier, useAlias = false), + exprToString(_, useQualifier, useAlias = false, caseSensitive), useQualifier, - useAlias) + useAlias, + caseSensitive) exprStr match { case res: Some[String] ⇒ diff --git a/modules/spark-3.2-ext/spark-3.2/src/test/scala/org/apache/ignite/spark/IgniteQueryCompilatorSpec.scala b/modules/spark-3.2-ext/spark-3.2/src/test/scala/org/apache/ignite/spark/IgniteQueryCompilatorSpec.scala new file mode 100644 index 0000000..c0054c6 --- /dev/null +++ b/modules/spark-3.2-ext/spark-3.2/src/test/scala/org/apache/ignite/spark/IgniteQueryCompilatorSpec.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.Ignite +import org.apache.ignite.cache.query.SqlFieldsQuery +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose} +import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE} +import org.apache.spark.sql.ignite.IgniteSparkSession +import org.junit.runner.RunWith +import org.scalatestplus.junit.JUnitRunner + +import java.lang.{Long => JLong} + +@RunWith(classOf[JUnitRunner]) +class IgniteQueryCompilatorSpec extends AbstractDataFrameSpec { + var igniteSession: IgniteSparkSession = _ + + describe("Supported column and table names in lower case") { + + it("should successfully read table data via DataFrameReader") { + val igniteDF = igniteSession.read + .format(FORMAT_IGNITE) + .option(OPTION_TABLE, "strings1") + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .load() + + assertResult(9)(igniteDF.count()) + } + + it("should successfully read table data from a single table via sql()") { + val df = igniteSession.sql("SELECT UPPER(str) FROM strings1 WHERE id = 1") + + checkOptimizationResult(df, "SELECT UPPER(\"str\") AS \"upper(str)\" FROM \"strings1\" WHERE \"id\" = 1") + + val data = Tuple1("AAA") + + checkQueryData(df, data) + } + + it("should successfully read table data from unioned tables via sql()") { + val df = igniteSession.sql( + "SELECT UPPER(str) FROM strings1 WHERE id = 1 " + + "UNION " + + "SELECT UPPER(str) FROM strings2 WHERE id = 7" + ) + + checkOptimizationResult(df, "SELECT \"upper(str)\" FROM (" + + "SELECT UPPER(\"str\") AS \"upper(str)\" FROM \"strings1\" WHERE \"id\" = 1 " + + "UNION " + + "SELECT UPPER(\"str\") AS \"upper(str)\" FROM \"strings2\" WHERE \"id\" = 7" + + ") table1") + + val data = ( + ("222"), + ("AAA") + ) + + checkQueryData(df, data) + } + + it("should successfully read table data from joined tables via sql()") { + val df = igniteSession.sql("SELECT UPPER(s1.str) FROM strings1 s1 JOIN strings2 s2 ON s1.id = s2.id " + + "WHERE s1.id = 1") + + checkOptimizationResult(df, "SELECT UPPER(\"strings1\".\"str\") AS \"upper(str)\" " + + "FROM \"strings1\" JOIN \"strings2\" ON \"strings1\".\"id\" = \"strings2\".\"id\" " + + "WHERE \"strings1\".\"id\" = 1 AND \"strings2\".\"id\" = 1") + + val data = Tuple1("AAA") + + checkQueryData(df, data) + } + + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + + createStringTable(client, DEFAULT_CACHE, "strings1") + createStringTable(client, DEFAULT_CACHE, "strings2") + + val configProvider = enclose(null)(x ⇒ () ⇒ { + val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1() + + cfg.setClientMode(true) + + cfg.setIgniteInstanceName("client-2") + + cfg + }) + + igniteSession = IgniteSparkSession.builder() + .config(spark.sparkContext.getConf) + .config("spark.sql.caseSensitive", "true") + .igniteConfigProvider(configProvider) + .getOrCreate() + } + + def createStringTable(client: Ignite, cacheName: String, tableName: String): Unit = { + val cache = client.cache(cacheName) + + cache.query(new SqlFieldsQuery( + s""" + | CREATE TABLE "$tableName" ( + | "id" LONG, + | "str" VARCHAR, + | PRIMARY KEY ("id")) WITH "backups=1" + """.stripMargin)).getAll + + val qry = new SqlFieldsQuery(s"""INSERT INTO \"$tableName\" (\"id\", \"str\") values (?, ?)""") + + cache.query(qry.setArgs(1L.asInstanceOf[JLong], "aaa")).getAll + cache.query(qry.setArgs(2L.asInstanceOf[JLong], "AAA")).getAll + cache.query(qry.setArgs(3L.asInstanceOf[JLong], "AAA ")).getAll + cache.query(qry.setArgs(4L.asInstanceOf[JLong], " AAA")).getAll + cache.query(qry.setArgs(5L.asInstanceOf[JLong], " AAA ")).getAll + cache.query(qry.setArgs(6L.asInstanceOf[JLong], "ABCDEF")).getAll + cache.query(qry.setArgs(7L.asInstanceOf[JLong], "222")).getAll + cache.query(qry.setArgs(8L.asInstanceOf[JLong], null)).getAll + cache.query(qry.setArgs(9L.asInstanceOf[JLong], "BAAAB")).getAll + } +}
