IGNITE-7077: Implementation of Spark query optimization. - Fixes #3397. Signed-off-by: Nikolay Izhikov <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3c3a24e8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3c3a24e8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3c3a24e8 Branch: refs/heads/ignite-2.5 Commit: 3c3a24e84a9a9f07d0833b29524de244930634c3 Parents: c2a8bbc Author: Nikolay Izhikov <[email protected]> Authored: Fri Apr 20 11:23:42 2018 +0300 Committer: Nikolay Izhikov <[email protected]> Committed: Mon Apr 23 15:15:47 2018 +0300 ---------------------------------------------------------------------- .../ignite/spark/IgniteDataFrameSettings.scala | 9 + .../org/apache/ignite/spark/IgniteRDD.scala | 7 +- .../spark/impl/IgniteRelationProvider.scala | 20 +- .../impl/IgniteSQLAccumulatorRelation.scala | 98 ++++ .../spark/impl/IgniteSQLDataFrameRDD.scala | 16 +- .../ignite/spark/impl/IgniteSQLRelation.scala | 67 +-- .../optimization/AggregateExpressions.scala | 114 ++++ .../optimization/ConditionExpressions.scala | 160 ++++++ .../impl/optimization/DateExpressions.scala | 127 +++++ .../impl/optimization/IgniteQueryContext.scala | 52 ++ .../impl/optimization/MathExpressions.scala | 263 +++++++++ .../impl/optimization/SimpleExpressions.scala | 180 ++++++ .../impl/optimization/StringExpressions.scala | 154 ++++++ .../optimization/SupportedExpressions.scala | 42 ++ .../impl/optimization/SystemExpressions.scala | 122 +++++ .../accumulator/JoinSQLAccumulator.scala | 222 ++++++++ .../accumulator/QueryAccumulator.scala | 70 +++ .../accumulator/SelectAccumulator.scala | 70 +++ .../accumulator/SingleTableSQLAccumulator.scala | 124 +++++ .../accumulator/UnionSQLAccumulator.scala | 63 +++ .../spark/impl/optimization/package.scala | 230 ++++++++ .../org/apache/ignite/spark/impl/package.scala | 48 +- .../spark/sql/ignite/IgniteOptimization.scala | 436 +++++++++++++++ .../spark/sql/ignite/IgniteSparkSession.scala | 10 +- .../ignite/spark/AbstractDataFrameSpec.scala | 68 ++- .../apache/ignite/spark/IgniteCatalogSpec.scala | 7 +- .../spark/IgniteDataFrameSchemaSpec.scala | 5 +- .../ignite/spark/IgniteDataFrameSuite.scala | 9 +- .../IgniteOptimizationAggregationFuncSpec.scala | 189 +++++++ .../spark/IgniteOptimizationDateFuncSpec.scala | 230 ++++++++ .../IgniteOptimizationDisableEnableSpec.scala | 127 +++++ .../spark/IgniteOptimizationJoinSpec.scala | 543 +++++++++++++++++++ .../spark/IgniteOptimizationMathFuncSpec.scala | 358 ++++++++++++ .../ignite/spark/IgniteOptimizationSpec.scala | 305 +++++++++++ .../IgniteOptimizationStringFuncSpec.scala | 313 +++++++++++ .../IgniteOptimizationSystemFuncSpec.scala | 147 +++++ 36 files changed, 4924 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala index 6bff476..9daaec4 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala @@ -153,4 +153,13 @@ object IgniteDataFrameSettings { * @see [[org.apache.ignite.IgniteDataStreamer#perNodeParallelOperations(int)]] */ val OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS = "streamerPerNodeParallelOperations" + + /** + * Option for a [[org.apache.spark.sql.SparkSession]] configuration. + * If `true` then all Ignite optimization of Spark SQL statements will be disabled. + * Default value is `false`. + * + * @see [[org.apache.spark.sql.ignite.IgniteOptimization]] + */ + val OPTION_DISABLE_SPARK_SQL_OPTIMIZATION = "ignite.disableSparkSQLOptimization" } http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index d87ea0a..5fb81b6 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -344,6 +344,11 @@ class IgniteRDD[K, V] ( object IgniteRDD { /** + * Default decimal type. + */ + private[spark] val DECIMAL = DecimalType(DecimalType.MAX_PRECISION, 3) + + /** * Gets Spark data type based on type name. * * @param typeName Type name. @@ -357,7 +362,7 @@ object IgniteRDD { case "java.lang.Long" â LongType case "java.lang.Float" â FloatType case "java.lang.Double" â DoubleType - case "java.math.BigDecimal" â DataTypes.createDecimalType() + case "java.math.BigDecimal" â DECIMAL case "java.lang.String" â StringType case "java.util.Date" â DateType case "java.sql.Date" â DateType http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala index a9f9f89..e4fa9f7 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala @@ -26,6 +26,7 @@ import org.apache.ignite.spark.IgniteDataFrameSettings._ import org.apache.ignite.spark.impl.QueryHelper.{createTable, dropTable, ensureCreateTableOptions, saveTable} import org.apache.spark.sql.SaveMode.{Append, Overwrite} import org.apache.spark.sql.ignite.IgniteExternalCatalog.{IGNITE_PROTOCOL, OPTION_GRID} +import org.apache.spark.sql.ignite.IgniteOptimization import org.apache.spark.sql.sources._ import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} @@ -176,11 +177,28 @@ class IgniteRelationProvider extends RelationProvider * @param sqlCtx SQL context. * @return Ignite SQL relation. */ - private def createRelation(igniteCtx: IgniteContext, tblName: String, sqlCtx: SQLContext): BaseRelation = + private def createRelation(igniteCtx: IgniteContext, tblName: String, sqlCtx: SQLContext): BaseRelation = { + val optimizationDisabled = + sqlCtx.sparkSession.conf.get(OPTION_DISABLE_SPARK_SQL_OPTIMIZATION, "false").toBoolean + + val experimentalMethods = sqlCtx.sparkSession.sessionState.experimentalMethods + + if (optimizationDisabled) { + experimentalMethods.extraOptimizations = + experimentalMethods.extraOptimizations.filter(_ != IgniteOptimization) + } + else { + val optimizationExists = experimentalMethods.extraOptimizations.contains(IgniteOptimization) + + if (!optimizationExists) + experimentalMethods.extraOptimizations = experimentalMethods.extraOptimizations :+ IgniteOptimization + } + IgniteSQLRelation( igniteCtx, tblName, sqlCtx) + } /** * @param params Params. http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLAccumulatorRelation.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLAccumulatorRelation.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLAccumulatorRelation.scala new file mode 100644 index 0000000..6eb600a --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLAccumulatorRelation.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl + +import org.apache.ignite.spark.impl +import org.apache.ignite.spark.impl.optimization.accumulator.{JoinSQLAccumulator, QueryAccumulator} +import org.apache.ignite.spark.impl.optimization.isSimpleTableAcc +import org.apache.spark.Partition +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, TableScan} +import org.apache.spark.sql.types.{Metadata, StructField, StructType} + +/** + * Relation to query data from query generated by <code>QueryAccumulator</code>. + * <code>QueryAccumulator</code> is generated by <code>IgniteOptimization</code>. + * + * @see IgniteOptimization + */ +class IgniteSQLAccumulatorRelation[K, V](val acc: QueryAccumulator) + (@transient val sqlContext: SQLContext) extends BaseRelation with TableScan { + + /** @inheritdoc */ + override def schema: StructType = + StructType(acc.output.map { c â + StructField( + name = c.name, + dataType = c.dataType, + nullable = c.nullable, + metadata = Metadata.empty) + }) + + /** @inheritdoc */ + override def buildScan(): RDD[Row] = + IgniteSQLDataFrameRDD[K, V]( + acc.igniteQueryContext.igniteContext, + acc.igniteQueryContext.cacheName, + schema, + acc.compileQuery(), + List.empty, + calcPartitions, + isDistributeJoin(acc) + ) + + /** @inheritdoc */ + override def toString: String = + s"IgniteSQLAccumulatorRelation(columns=[${acc.output.map(_.name).mkString(", ")}], qry=${acc.compileQuery()})" + + /** + * @return Collection of spark partition. + */ + private def calcPartitions: Array[Partition] = + //If accumulator stores some complex query(join, aggregation, limit, order, etc.). + //we has to load data from Ignite as a single Spark partition. + if (!isSimpleTableAcc(acc)){ + val aff = acc.igniteQueryContext.igniteContext.ignite().affinity(acc.igniteQueryContext.cacheName) + + val parts = aff.partitions() + + Array(IgniteDataFramePartition(0, primary = null, igniteParts = (0 until parts).toList)) + } + else + impl.calcPartitions(acc.igniteQueryContext.igniteContext, acc.igniteQueryContext.cacheName) + + /** + * @param acc Plan. + * @return True if plan of one or its children are `JoinSQLAccumulator`, false otherwise. + */ + private def isDistributeJoin(acc: LogicalPlan): Boolean = + acc match { + case _: JoinSQLAccumulator â + true + + case _ â + acc.children.exists(isDistributeJoin) + } +} + +object IgniteSQLAccumulatorRelation { + def apply[K, V](acc: QueryAccumulator): IgniteSQLAccumulatorRelation[K, V] = + new IgniteSQLAccumulatorRelation[K, V](acc)(acc.igniteQueryContext.sqlContext) +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala index 93ef529..ec502fc 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala @@ -36,7 +36,8 @@ class IgniteSQLDataFrameRDD[K, V]( schema: StructType, qryStr: String, args: List[_], - parts: Array[Partition]) extends + parts: Array[Partition], + distributedJoin: Boolean) extends IgniteSqlRDD[Row, JList[_], K, V]( ic, cacheName, @@ -56,13 +57,17 @@ class IgniteSQLDataFrameRDD[K, V]( override def compute(partition: Partition, context: TaskContext): Iterator[Row] = { val qry0 = new SqlFieldsQuery(qryStr) + qry0.setDistributedJoins(distributedJoin) + if (args.nonEmpty) qry0.setArgs(args.map(_.asInstanceOf[Object]): _*) val ccfg = ic.ignite().cache[K, V](cacheName).getConfiguration(classOf[CacheConfiguration[K, V]]) - if (ccfg.getCacheMode != CacheMode.REPLICATED) - qry0.setPartitions(partition.asInstanceOf[IgniteDataFramePartition].igniteParts: _*) + val ignitePartition = partition.asInstanceOf[IgniteDataFramePartition] + + if (ccfg.getCacheMode != CacheMode.REPLICATED && ignitePartition.igniteParts.nonEmpty && !distributedJoin) + qry0.setPartitions(ignitePartition.igniteParts: _*) qry = qry0 @@ -76,7 +81,8 @@ object IgniteSQLDataFrameRDD { schema: StructType, qryStr: String, args: List[_], - parts: Array[Partition] = Array(IgnitePartition(0))) = { - new IgniteSQLDataFrameRDD(ic, cacheName, schema, qryStr, args, parts) + parts: Array[Partition] = Array(IgnitePartition(0)), + distributedJoin: Boolean = false): IgniteSQLDataFrameRDD[K, V] = { + new IgniteSQLDataFrameRDD[K, V](ic, cacheName, schema, qryStr, args, parts, distributedJoin) } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala index 1fb8de7..485ddf6 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala @@ -18,10 +18,8 @@ package org.apache.ignite.spark.impl import org.apache.ignite.IgniteException -import org.apache.ignite.cache.{CacheMode, QueryEntity} -import org.apache.ignite.cluster.ClusterNode -import org.apache.ignite.configuration.CacheConfiguration -import org.apache.ignite.spark.{IgniteContext, IgniteRDD} +import org.apache.ignite.cache.QueryEntity +import org.apache.ignite.spark.{IgniteContext, IgniteRDD, impl} import org.apache.spark.Partition import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD @@ -30,14 +28,13 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.{Row, SQLContext} import scala.collection.JavaConversions._ -import scala.collection.mutable.ArrayBuffer /** * Apache Ignite implementation of Spark BaseRelation with PrunedFilteredScan for Ignite SQL Tables */ class IgniteSQLRelation[K, V]( - private[spark] val ic: IgniteContext, - private[spark] val tableName: String) + private[apache] val ic: IgniteContext, + private[apache] val tableName: String) (@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan with Logging { /** @@ -92,41 +89,13 @@ class IgniteSQLRelation[K, V]( qryAndArgs } - private def calcPartitions(filters: Array[Filter]): Array[Partition] = { - val cache = ic.ignite().cache[K, V](cacheName) - - val ccfg = cache.getConfiguration(classOf[CacheConfiguration[K, V]]) - - if (ccfg.getCacheMode == CacheMode.REPLICATED) { - val serverNodes = ic.ignite().cluster().forCacheNodes(cacheName).forServers().nodes() - - Array(IgniteDataFramePartition(0, serverNodes.head, Stream.from(0).take(1024).toList)) - } - else { - val aff = ic.ignite().affinity(cacheName) - - val parts = aff.partitions() - - val nodesToParts = (0 until parts).foldLeft(Map[ClusterNode, ArrayBuffer[Int]]()) { - case (nodeToParts, ignitePartIdx) â - val primary = aff.mapPartitionToPrimaryAndBackups(ignitePartIdx).head - - if (nodeToParts.contains(primary)) { - nodeToParts(primary) += ignitePartIdx - - nodeToParts - } - else - nodeToParts + (primary â ArrayBuffer[Int](ignitePartIdx)) - } - - val partitions = nodesToParts.zipWithIndex.map { case ((node, nodesParts), i) â - IgniteDataFramePartition(i, node, nodesParts.toList) - } - - partitions.toArray - } - } + /** + * Computes spark partitions for this relation. + * + * @return Array of IgniteDataFramPartition. + */ + private def calcPartitions(filters: Array[Filter]): Array[Partition] = + impl.calcPartitions(ic, cacheName) /** * Cache name for a table name. @@ -134,20 +103,6 @@ class IgniteSQLRelation[K, V]( private lazy val cacheName: String = sqlCacheName(ic.ignite(), tableName) .getOrElse(throw new IgniteException(s"Unknown table $tableName")) - - /** - * Utility method to add clause to sql WHERE string. - * - * @param filterStr Current filter string - * @param clause Clause to add. - * @return Filter string. - */ - private def addStrClause(filterStr: String, clause: String) = - if (filterStr.isEmpty) - clause - else - filterStr + " AND " + clause - } object IgniteSQLRelation { http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala new file mode 100644 index 0000000..421a9a9 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl.optimization + +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.types._ + +/** + * Object to support aggregate expressions like `sum` or `avg`. + */ +private[optimization] object AggregateExpressions extends SupportedExpressions { + /** @inheritdoc */ + def apply(expr: Expression, checkChild: (Expression) â Boolean): Boolean = expr match { + case AggregateExpression(aggregateFunction, _, _, _) â + checkChild(aggregateFunction) + + case Average(child) â + checkChild(child) + + case Count(children) â + children.forall(checkChild) + + case Max(child) â + checkChild(child) + + case Min(child) â + checkChild(child) + + case Sum(child) â + checkChild(child) + + case _ â + false + } + + /** @inheritdoc */ + override def toString(expr: Expression, childToString: Expression â String, useQualifier: Boolean, + useAlias: Boolean): Option[String] = expr match { + case AggregateExpression(aggregateFunction, _, isDistinct, _) â + aggregateFunction match { + case Count(children) â + if (isDistinct) + Some(s"COUNT(DISTINCT ${children.map(childToString(_)).mkString(" ")})") + else + Some(s"COUNT(${children.map(childToString(_)).mkString(" ")})") + + case sum: Sum â + if (isDistinct) + Some(castSum( + s"SUM(DISTINCT ${sum.children.map(childToString(_)).mkString(" ")})", sum.dataType)) + else + Some(castSum(s"SUM(${sum.children.map(childToString(_)).mkString(" ")})", sum.dataType)) + + case _ â + Some(childToString(aggregateFunction)) + } + + case Average(child) â + child.dataType match { + case DecimalType() | DoubleType â + Some(s"AVG(${childToString(child)})") + + case _ â + //Spark `AVG` return type is always a double or a decimal. + //See [[org.apache.spark.sql.catalyst.expressions.aggregate.Average]] + //But Ignite `AVG` return type for a integral types is integral. + //To preserve query correct results has to cast column to double. + Some(s"AVG(CAST(${childToString(child)} AS DOUBLE))") + } + + + case Count(children) â + Some(s"COUNT(${children.map(childToString(_)).mkString(" ")})") + + case Max(child) â + Some(s"MAX(${childToString(child)})") + + case Min(child) â + Some(s"MIN(${childToString(child)})") + + case sum: Sum â + Some(castSum(s"SUM(${childToString(sum.child)})", sum.dataType)) + + case _ â + None + } + + /** + * Ignite returns BigDecimal but Spark expects BIGINT. + */ + private def castSum(sumSql: String, dataType: DataType): String = dataType match { + case LongType â + s"CAST($sumSql AS BIGINT)" + + case _ â + s"$sumSql" + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala new file mode 100644 index 0000000..fbfbd64 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.spark.impl.optimization + +import org.apache.spark.sql.catalyst.expressions.{Expression, _} + +/** + * Object to support condition expression. Like `and` or `in` operators. + */ +private[optimization] object ConditionExpressions extends SupportedExpressions { + /** @inheritdoc */ + def apply(expr: Expression, checkChild: (Expression) â Boolean): Boolean = expr match { + case EqualTo(left, right) â + checkChild(left) && checkChild(right) + + case EqualNullSafe(left, right) â + checkChild(left) && checkChild(right) + + case GreaterThan(left, right) â + checkChild(left) && checkChild(right) + + case GreaterThanOrEqual(left, right) â + checkChild(left) && checkChild(right) + + case LessThan(left, right) â + checkChild(left) && checkChild(right) + + case LessThanOrEqual(left, right) â + checkChild(left) && checkChild(right) + + case InSet(child, set) if set.forall(_.isInstanceOf[Literal]) â + checkChild(child) + + case In(child, list) if list.forall(_.isInstanceOf[Literal]) â + checkChild(child) + + case IsNull(child) â + checkChild(child) + + case IsNotNull(child) â + checkChild(child) + + case And(left, right) â + checkChild(left) && checkChild(right) + + case Or(left, right) â + checkChild(left) && checkChild(right) + + case Not(child) â + checkChild(child) + + case StartsWith(left, right) â + checkChild(left) && checkChild(right) + + case EndsWith(left, right) â + checkChild(left) && checkChild(right) + + case Contains(left, right) â + checkChild(left) && checkChild(right) + + case _ â + false + } + + /** @inheritdoc */ + override def toString(expr: Expression, childToString: Expression â String, useQualifier: Boolean, + useAlias: Boolean): Option[String] = expr match { + case EqualTo(left, right) â + Some(s"${childToString(left)} = ${childToString(right)}") + + case EqualNullSafe(left, right) â + Some(s"(${childToString(left)} IS NULL OR ${childToString(left)} = ${childToString(right)})") + + case GreaterThan(left, right) â + Some(s"${childToString(left)} > ${childToString(right)}") + + case GreaterThanOrEqual(left, right) â + Some(s"${childToString(left)} >= ${childToString(right)}") + + case LessThan(left, right) â + Some(s"${childToString(left)} < ${childToString(right)}") + + case LessThanOrEqual(left, right) â + Some(s"${childToString(left)} <= ${childToString(right)}") + + case In(attr, values) â + Some(s"${childToString(attr)} IN (${values.map(childToString(_)).mkString(", ")})") + + case IsNull(child) â + Some(s"${childToString(child)} IS NULL") + + case IsNotNull(child) â + Some(s"${childToString(child)} IS NOT NULL") + + case And(left, right) â + Some(s"${childToString(left)} AND ${childToString(right)}") + + case Or(left, right) â + Some(s"${childToString(left)} OR ${childToString(right)}") + + case Not(child) â + Some(s"NOT ${childToString(child)}") + + case StartsWith(attr, value) â { + //Expecting string literal here. + //To add % sign it's required to remove quotes. + val valStr = removeQuotes(childToString(value)) + + Some(s"${childToString(attr)} LIKE '$valStr%'") + } + + case EndsWith(attr, value) â { + //Expecting string literal here. + //To add % sign it's required to remove quotes. + val valStr = removeQuotes(childToString(value)) + + Some(s"${childToString(attr)} LIKE '%$valStr'") + } + + case Contains(attr, value) â { + //Expecting string literal here. + //To add % signs it's required to remove quotes. + val valStr = removeQuotes(childToString(value)) + + Some(s"${childToString(attr)} LIKE '%$valStr%'") + } + + case _ â + None + } + + /** + * @param str String to process. + * @return Str without surrounding quotes. + */ + private def removeQuotes(str: String): String = + if (str.length < 2) + str + else + str match { + case quoted if quoted.startsWith("'") && quoted.endsWith("'") â + quoted.substring(1, quoted.length-1) + + case _ â str + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala new file mode 100644 index 0000000..d075bf0 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl.optimization + +import org.apache.spark.sql.catalyst.expressions.{Expression, _} + +/** + * Object to support expressions to work with date/timestamp. + */ +private[optimization] object DateExpressions extends SupportedExpressions { + /** @inheritdoc */ + def apply(expr: Expression, checkChild: (Expression) â Boolean): Boolean = expr match { + case CurrentDate(None) â + true + + case CurrentTimestamp() â + true + + case DateAdd(startDate, days) â + checkChild(startDate) && checkChild(days) + + case DateDiff(date1, date2) â + checkChild(date1) && checkChild(date2) + + case DayOfMonth(date) â + checkChild(date) + + case DayOfYear(date) â + checkChild(date) + + case Hour(date, _) â + checkChild(date) + + case Minute(date, _) â + checkChild(date) + + case Month(date) â + checkChild(date) + + case ParseToDate(left, format, child) â + checkChild(left) && (format.isEmpty || checkChild(format.get)) && checkChild(child) + + case Quarter(date) â + checkChild(date) + + case Second(date, _) â + checkChild(date) + + case WeekOfYear(date) â + checkChild(date) + + case Year(date) â + checkChild(date) + + case _ â + false + } + + /** @inheritdoc */ + override def toString(expr: Expression, childToString: Expression â String, useQualifier: Boolean, + useAlias: Boolean): Option[String] = expr match { + case CurrentDate(_) â + Some(s"CURRENT_DATE()") + + case CurrentTimestamp() â + Some(s"CURRENT_TIMESTAMP()") + + case DateAdd(startDate, days) â + Some(s"CAST(DATEADD('DAY', ${childToString(days)}, ${childToString(startDate)}) AS DATE)") + + case DateDiff(date1, date2) â + Some(s"CAST(DATEDIFF('DAY', ${childToString(date1)}, ${childToString(date2)}) AS INT)") + + case DayOfMonth(date) â + Some(s"DAY_OF_MONTH(${childToString(date)})") + + case DayOfYear(date) â + Some(s"DAY_OF_YEAR(${childToString(date)})") + + case Hour(date, _) â + Some(s"HOUR(${childToString(date)})") + + case Minute(date, _) â + Some(s"MINUTE(${childToString(date)})") + + case Month(date) â + Some(s"MINUTE(${childToString(date)})") + + case ParseToDate(left, formatOption, _) â + formatOption match { + case Some(format) â + Some(s"PARSEDATETIME(${childToString(left)}, ${childToString(format)})") + case None â + Some(s"PARSEDATETIME(${childToString(left)})") + } + + case Quarter(date) â + Some(s"QUARTER(${childToString(date)})") + + case Second(date, _) â + Some(s"SECOND(${childToString(date)})") + + case WeekOfYear(date) â + Some(s"WEEK(${childToString(date)})") + + case Year(date) â + Some(s"YEAR(${childToString(date)})") + + case _ â + None + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/IgniteQueryContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/IgniteQueryContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/IgniteQueryContext.scala new file mode 100644 index 0000000..c5a7f34 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/IgniteQueryContext.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl.optimization + +import org.apache.ignite.spark.IgniteContext +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.expressions.NamedExpression + +/** + * Class to store Ignite query info during optimization process. + * + * @param igniteContext IgniteContext. + * @param sqlContext SQLContext. + * @param cacheName Cache name. + * @param aliasIndex Iterator to generate indexes for auto-generated aliases. + * @param catalogTable CatalogTable from source relation. + */ +case class IgniteQueryContext( + igniteContext: IgniteContext, + sqlContext: SQLContext, + cacheName: String, + aliasIndex: Iterator[Int], + catalogTable: Option[CatalogTable] = None, + distributeJoin: Boolean = false +) { + /** + * @return Unique table alias. + */ + def uniqueTableAlias: String = "table" + aliasIndex.next + + /** + * @param col Column + * @return Unique column alias. + */ + def uniqueColumnAlias(col: NamedExpression): String = col.name + "_" + aliasIndex.next +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala new file mode 100644 index 0000000..dc05e95 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl.optimization + +import org.apache.spark.sql.catalyst.expressions.{Expression, _} + +/** + * Object to support math expressions. + */ +private[optimization] object MathExpressions extends SupportedExpressions { + /** @inheritdoc */ + def apply(expr: Expression, checkChild: (Expression) â Boolean): Boolean = expr match { + case Abs(child) â + checkChild(child) + + case Acos(child) â + checkChild(child) + + case Asin(child) â + checkChild(child) + + case Atan(child) â + checkChild(child) + + case Cos(child) â + checkChild(child) + + case Cosh(child) â + checkChild(child) + + case Sin(child) â + checkChild(child) + + case Sinh(child) â + checkChild(child) + + case Tan(child) â + checkChild(child) + + case Tanh(child) â + checkChild(child) + + case Atan2(left, right) â + checkChild(left) && checkChild(right) + + case BitwiseAnd(left, right) â + checkChild(left) && checkChild(right) + + case BitwiseOr(left, right) â + checkChild(left) && checkChild(right) + + case BitwiseXor(left, right) â + checkChild(left) && checkChild(right) + + case Ceil(child) â + checkChild(child) + + case ToDegrees(child) â + checkChild(child) + + case Exp(child) â + checkChild(child) + + case Floor(child) â + checkChild(child) + + case Log(child) â + checkChild(child) + + case Log10(child) â + checkChild(child) + + case Logarithm(left, right) â + checkChild(left) && checkChild(right) + + case ToRadians(child) â + checkChild(child) + + case Sqrt(child) â + checkChild(child) + + case _: Pi â + true + + case _: EulerNumber â + true + + case Pow(left, right) â + checkChild(left) && checkChild(right) + + case Rand(child) â + checkChild(child) + + case Round(child, scale) â + checkChild(child) && checkChild(scale) + + case Signum(child) â + checkChild(child) + + case Remainder(left, right) â + checkChild(left) && checkChild(right) + + case Divide(left, right) â + checkChild(left) && checkChild(right) + + case Multiply(left, right) â + checkChild(left) && checkChild(right) + + case Subtract(left, right) â + checkChild(left) && checkChild(right) + + case Add(left, right) â + checkChild(left) && checkChild(right) + + case UnaryMinus(child) â + checkChild(child) + + case UnaryPositive(child) â + checkChild(child) + + case _ â false + } + + /** @inheritdoc */ + override def toString(expr: Expression, childToString: Expression â String, useQualifier: Boolean, + useAlias: Boolean): Option[String] = expr match { + case Abs(child) â + Some(s"ABS(${childToString(child)})") + + case Acos(child) â + Some(s"ACOS(${childToString(child)})") + + case Asin(child) â + Some(s"ASIN(${childToString(child)})") + + case Atan(child) â + Some(s"ATAN(${childToString(child)})") + + case Cos(child) â + Some(s"COS(${childToString(child)})") + + case Cosh(child) â + Some(s"COSH(${childToString(child)})") + + case Sin(child) â + Some(s"SIN(${childToString(child)})") + + case Sinh(child) â + Some(s"SINH(${childToString(child)})") + + case Tan(child) â + Some(s"TAN(${childToString(child)})") + + case Tanh(child) â + Some(s"TANH(${childToString(child)})") + + case Atan2(left, right) â + Some(s"ATAN2(${childToString(left)}, ${childToString(right)})") + + case BitwiseAnd(left, right) â + Some(s"BITAND(${childToString(left)}, ${childToString(right)})") + + case BitwiseOr(left, right) â + Some(s"BITOR(${childToString(left)}, ${childToString(right)})") + + case BitwiseXor(left, right) â + Some(s"BITXOR(${childToString(left)}, ${childToString(right)})") + + case Ceil(child) â + Some(s"CAST(CEIL(${childToString(child)}) AS LONG)") + + case ToDegrees(child) â + Some(s"DEGREES(${childToString(child)})") + + case Exp(child) â + Some(s"EXP(${childToString(child)})") + + case Floor(child) â + Some(s"CAST(FLOOR(${childToString(child)}) AS LONG)") + + case Log(child) â + Some(s"LOG(${childToString(child)})") + + case Log10(child) â + Some(s"LOG10(${childToString(child)})") + + case Logarithm(base, arg) â + childToString(base) match { + //Spark internally converts LN(XXX) to LOG(2.718281828459045, XXX). + //Because H2 doesn't have builtin function for a free base logarithm + //I want to prevent usage of log(a, b) = ln(a)/ln(b) when possible. + case "2.718281828459045" â + Some(s"LOG(${childToString(arg)})") + case "10" â + Some(s"LOG10(${childToString(arg)})") + case argStr â + Some(s"(LOG(${childToString(arg)})/LOG($argStr))") + } + + case ToRadians(child) â + Some(s"RADIANS(${childToString(child)})") + + case Sqrt(child) â + Some(s"SQRT(${childToString(child)})") + + case _: Pi â + Some("PI()") + + case _: EulerNumber â + Some("E()") + + case Pow(left, right) â + Some(s"POWER(${childToString(left)}, ${childToString(right)})") + + case Rand(child) â + Some(s"RAND(${childToString(child)})") + + case Round(child, scale) â + Some(s"ROUND(${childToString(child)}, ${childToString(scale)})") + + case Signum(child) â + Some(s"SIGN(${childToString(child)})") + + case Remainder(left, right) â + Some(s"${childToString(left)} % ${childToString(right)}") + + case Divide(left, right) â + Some(s"${childToString(left)} / ${childToString(right)}") + + case Multiply(left, right) â + Some(s"${childToString(left)} * ${childToString(right)}") + + case Subtract(left, right) â + Some(s"${childToString(left)} - ${childToString(right)}") + + case Add(left, right) â + Some(s"${childToString(left)} + ${childToString(right)}") + + case UnaryMinus(child) â + Some(s"-${childToString(child)}") + + case UnaryPositive(child) â + Some(s"+${childToString(child)}") + + case _ â + None + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala new file mode 100644 index 0000000..a1c9458 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala @@ -0,0 +1,180 @@ +package org.apache.ignite.spark.impl.optimization + +import java.text.SimpleDateFormat + +import org.apache.spark.sql.catalyst.expressions.{Expression, _} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types._ + +/** + * Object to support some 'simple' expressions like aliases. + */ +private[optimization] object SimpleExpressions extends SupportedExpressions { + /** @inheritdoc */ + override def apply(expr: Expression, checkChild: Expression â Boolean): Boolean = expr match { + case Literal(_, _) â + true + + case _: Attribute â + true + + case Alias(child, _) â + checkChild(child) + + case Cast(child, dataType, _) â + checkChild(child) && castSupported(from = child.dataType, to = dataType) + + case _ â + false + } + + /** @inheritdoc */ + override def toString(expr: Expression, childToString: Expression â String, useQualifier: Boolean, + useAlias: Boolean): Option[String] = expr match { + case l: Literal â l.dataType match { + case StringType â + Some("'" + l.value.toString + "'") + + case TimestampType â + l.value match { + //Internal representation of TimestampType is Long. + //So we converting from internal spark representation to CAST call. + case date: Long â + Some(s"CAST('${timestampFormat.get.format(DateTimeUtils.toJavaTimestamp(date))}' AS TIMESTAMP)") + + case _ â + Some(l.value.toString) + } + + case DateType â + l.value match { + //Internal representation of DateType is Int. + //So we converting from internal spark representation to CAST call. + case days: Integer â + val date = new java.util.Date(DateTimeUtils.daysToMillis(days)) + + Some(s"CAST('${dateFormat.get.format(date)}' AS DATE)") + + case _ â + Some(l.value.toString) + } + + case _ â + if (l.value == null) + Some("null") + else + Some(l.value.toString) + } + + case ar: AttributeReference â + val name = + if (useQualifier) + ar.qualifier.map(_ + "." + ar.name).getOrElse(ar.name) + else + ar.name + + if (ar.metadata.contains(ALIAS) && !isAliasEqualColumnName(ar.metadata.getString(ALIAS), ar.name) && useAlias) + Some(aliasToString(name, ar.metadata.getString(ALIAS))) + else + Some(name) + + case Alias(child, name) â + if (useAlias) + Some(childToString(child)).map(aliasToString(_, name)) + else + Some(childToString(child)) + + case Cast(child, dataType, _) â + Some(s"CAST(${childToString(child)} AS ${toSqlType(dataType)})") + + case SortOrder(child, direction, _, _) â + Some(s"${childToString(child)}${if(direction==Descending) " DESC" else ""}") + + case _ â + None + } + + /** + * @param column Column name. + * @param alias Alias. + * @return SQL String for column with alias. + */ + private def aliasToString(column: String, alias: String): String = + if (isAliasEqualColumnName(alias, column)) + column + else if (alias.matches("[A-Za-z_][0-9A-Za-z_]*")) + s"$column AS $alias" + else + s"""$column AS "$alias"""" + + /** + * @param alias Alias. + * @param column Column. + * @return True if name equals to alias, false otherwise. + */ + private def isAliasEqualColumnName(alias: String, column: String): Boolean = + alias.compareToIgnoreCase(column.replaceAll("'", "")) == 0 + + /** + * @param from From type conversion. + * @param to To type conversion. + * @return True if cast support for types, false otherwise. + */ + private def castSupported(from: DataType, to: DataType): Boolean = from match { + case BooleanType â + Set[DataType](BooleanType, StringType)(to) + + case ByteType â + Set(ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, StringType, DecimalType(_, _), StringType)(to) + + case ShortType â + Set(ShortType, IntegerType, LongType, FloatType, DoubleType, StringType, DecimalType(_, _))(to) + + case IntegerType â + Set(IntegerType, LongType, FloatType, DoubleType, StringType, DecimalType(_, _))(to) + + case LongType â + Set(LongType, FloatType, DoubleType, StringType, DecimalType(_, _))(to) + + case FloatType â + Set(FloatType, DoubleType, StringType, DecimalType(_, _))(to) + + case DoubleType â + Set(DoubleType, StringType, DecimalType(_, _))(to) + + case DecimalType() â + Set(StringType, DecimalType(_, _))(to) + + case DateType â + Set[DataType](DateType, StringType, LongType, TimestampType)(to) + + case TimestampType â + Set[DataType](TimestampType, DateType, StringType, LongType)(to) + + case StringType â + Set(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, + DecimalType(_, _), DateType, TimestampType, StringType)(to) + + case BinaryType â + false + + case ArrayType(_, _) â + false + } + + /** + * Date format built-in Ignite. + */ + private val dateFormat: ThreadLocal[SimpleDateFormat] = new ThreadLocal[SimpleDateFormat] { + override def initialValue(): SimpleDateFormat = + new SimpleDateFormat("yyyy-MM-dd") + } + + /** + * Timestamp format built-in Ignite. + */ + private val timestampFormat: ThreadLocal[SimpleDateFormat] = new ThreadLocal[SimpleDateFormat] { + override def initialValue(): SimpleDateFormat = + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala new file mode 100644 index 0000000..1ecab2c --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl.optimization + +import org.apache.spark.sql.catalyst.expressions.{Expression, _} + +/** + * Object to support expressions to work with strings like `length` or `trim`. + */ +private[optimization] object StringExpressions extends SupportedExpressions { + /** @inheritdoc */ + def apply(expr: Expression, checkChild: (Expression) â Boolean): Boolean = expr match { + case Ascii(child) â + checkChild(child) + + case Length(child) â + checkChild(child) + + case Concat(children) â + children.forall(checkChild) + + case ConcatWs(children) â + children.forall(checkChild) + + case StringInstr(str, substr) â + checkChild(str) && checkChild(substr) + + case Lower(child) â + checkChild(child) + + case Upper(child) â + checkChild(child) + + case StringLocate(substr, str, start) â + checkChild(substr) && checkChild(str) && checkChild(start) + + case StringLPad(str, len, pad) â + checkChild(str) && checkChild(len) && checkChild(pad) + + case StringRPad(str, len, pad) â + checkChild(str) && checkChild(len) && checkChild(pad) + + case StringTrimLeft(child) â + checkChild(child) + + case StringTrimRight(child) â + checkChild(child) + + case StringTrim(child) â + checkChild(child) + + case RegExpReplace(subject, regexp, rep) â + checkChild(subject) && checkChild(regexp) && checkChild(rep) + + case StringRepeat(str, times) â + checkChild(str) && checkChild(times) + + case SoundEx(child) â + checkChild(child) + + case StringSpace(child) â + checkChild(child) + + case Substring(str, pos, len) â + checkChild(str) && checkChild(pos) && checkChild(len) + + case Substring(str, pos, len) â + checkChild(str) && checkChild(pos) && checkChild(len) + + case StringTranslate(str, strMatch, strReplace) â + checkChild(str) && checkChild(strMatch) && checkChild(strReplace) + + case _ â false + } + + /** @inheritdoc */ + override def toString(expr: Expression, childToString: Expression â String, useQualifier: Boolean, + useAlias: Boolean): Option[String] = expr match { + case Ascii(child) â + Some(s"ASCII(${childToString(child)})") + + case Length(child) â + Some(s"CAST(LENGTH(${childToString(child)}) AS INTEGER)") + + case Concat(children) â + Some(s"CONCAT(${children.map(childToString(_)).mkString(", ")})") + + case ConcatWs(children) â + Some(s"CONCAT_WS(${children.map(childToString(_)).mkString(", ")})") + + case StringInstr(str, substr) â + Some(s"POSITION(${childToString(substr)}, ${childToString(str)})") + + case Lower(child) â + Some(s"LOWER(${childToString(child)})") + + case Upper(child) â + Some(s"UPPER(${childToString(child)})") + + case StringLocate(substr, str, start) â + Some(s"LOCATE(${childToString(substr)}, ${childToString(str)}, ${childToString(start)})") + + case StringLPad(str, len, pad) â + Some(s"LPAD(${childToString(str)}, ${childToString(len)}, ${childToString(pad)})") + + case StringRPad(str, len, pad) â + Some(s"RPAD(${childToString(str)}, ${childToString(len)}, ${childToString(pad)})") + + case StringTrimLeft(child) â + Some(s"LTRIM(${childToString(child)})") + + case StringTrimRight(child) â + Some(s"RTRIM(${childToString(child)})") + + case StringTrim(child) â + Some(s"TRIM(${childToString(child)})") + + case RegExpReplace(subject, regexp, rep) â + Some(s"REGEXP_REPLACE(${childToString(subject)}, ${childToString(regexp)}, ${childToString(rep)})") + + case StringRepeat(str, times) â + Some(s"REPEAT(${childToString(str)}, ${childToString(times)})") + + case SoundEx(child) â + Some(s"SOUND_EX(${childToString(child)})") + + case StringSpace(child) â + Some(s"SPACE(${childToString(child)})") + + case Substring(str, pos, len) â + Some(s"SUBSTR(${childToString(str)}, ${childToString(pos)}, ${childToString(len)})") + + case StringTranslate(str, strMatch, strReplace) â + Some(s"TRANSLATE(${childToString(str)}, ${childToString(strMatch)}, ${childToString(strReplace)})") + + case _ â + None + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala new file mode 100644 index 0000000..f46eb72 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl.optimization + +import org.apache.spark.sql.catalyst.expressions.Expression + +/** + * Provides methods to work with Spark SQL expression that supported by Ignite SQL syntax. + */ +private[optimization] trait SupportedExpressions { + /** + * @param expr Expression to check. + * @param checkChild Closure to check child expression. + * @return True if `expr` are supported, false otherwise. + */ + def apply(expr: Expression, checkChild: (Expression) â Boolean): Boolean + + /** + * @param expr Expression to convert to string. + * @param childToString Closure to convert children expressions. + * @param useQualifier If true `expr` should be printed using qualifier. `Table1.id` for example. + * @param useAlias If true `expr` should be printed with alias. `name as person_name` for example. + * @return SQL representation of `expr` if it supported. `None` otherwise. + */ + def toString(expr: Expression, childToString: (Expression) â String, useQualifier: Boolean, + useAlias: Boolean): Option[String] +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala new file mode 100644 index 0000000..40e4e29 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl.optimization + +import org.apache.ignite.IgniteException +import org.apache.spark.sql.catalyst.expressions.{Coalesce, EqualTo, Expression, Greatest, If, IfNull, IsNotNull, IsNull, Least, Literal, NullIf, Nvl2} + +/** + * Object to support some built-in expressions like `nvl2` or `coalesce`. + */ +private[optimization] object SystemExpressions extends SupportedExpressions { + /** @inheritdoc */ + override def apply(expr: Expression, checkChild: Expression â Boolean): Boolean = expr match { + case Coalesce(children) â + children.forall(checkChild) + + case Greatest(children) â + children.forall(checkChild) + + case IfNull(left, right, _) â + checkChild(left) && checkChild(right) + + case Least(children) â + children.forall(checkChild) + + case NullIf(left, right, _) â + checkChild(left) && checkChild(right) + + case Nvl2(expr1, expr2, expr3, _) â + checkChild(expr1) && checkChild(expr2) && checkChild(expr3) + + case If(predicate, trueValue, falseValue) â + predicate match { + case IsNotNull(child) â + checkChild(child) && checkChild(trueValue) && checkChild(falseValue) + + case IsNull(child) â + checkChild(child) && checkChild(trueValue) && checkChild(falseValue) + + case EqualTo(left, right) â + trueValue match { + case Literal(null, _) â + (left == falseValue || right == falseValue) && checkChild(left) && checkChild(right) + + case _ â + false + } + + case _ â + false + } + + case _ â + false + } + + /** @inheritdoc */ + override def toString(expr: Expression, childToString: Expression â String, useQualifier: Boolean, + useAlias: Boolean): Option[String] = expr match { + case Coalesce(children) â + Some(s"COALESCE(${children.map(childToString(_)).mkString(", ")})") + + case Greatest(children) â + Some(s"GREATEST(${children.map(childToString(_)).mkString(", ")})") + + case IfNull(left, right, _) â + Some(s"IFNULL(${childToString(left)}, ${childToString(right)})") + + case Least(children) â + Some(s"LEAST(${children.map(childToString(_)).mkString(", ")})") + + case NullIf(left, right, _) â + Some(s"NULLIF(${childToString(left)}, ${childToString(right)})") + + case Nvl2(expr1, expr2, expr3, _) â + Some(s"NVL2(${childToString(expr1)}, ${childToString(expr2)}, ${childToString(expr3)})") + + case If(predicate, trueValue, falseValue) â + predicate match { + case IsNotNull(child) â + Some(s"NVL2(${childToString(child)}, ${childToString(trueValue)}, ${childToString(falseValue)})") + + case IsNull(child) â + Some(s"NVL2(${childToString(child)}, ${childToString(falseValue)}, ${childToString(trueValue)})") + + case EqualTo(left, right) â + trueValue match { + case Literal(null, _) â + if (left == falseValue) + Some(s"NULLIF(${childToString(left)}, ${childToString(right)})") + else if (right == falseValue) + Some(s"NULLIF(${childToString(right)}, ${childToString(left)})") + else + throw new IgniteException(s"Expression not supported. $expr") + + case _ â + throw new IgniteException(s"Expression not supported. $expr") + } + + case _ â + throw new IgniteException(s"Expression not supported. $expr") + } + + case _ â + None + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala new file mode 100644 index 0000000..7ae5e70 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl.optimization.accumulator + +import org.apache.ignite.IgniteException +import org.apache.ignite.spark.impl.optimization._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, NamedExpression, SortOrder} +import org.apache.spark.sql.catalyst.plans.logical.BinaryNode +import org.apache.spark.sql.catalyst.plans.{Inner, JoinType, LeftOuter, RightOuter} + +/** + * Accumulator to store information about join query. + */ +private[apache] case class JoinSQLAccumulator( + igniteQueryContext: IgniteQueryContext, + left: QueryAccumulator, + right: QueryAccumulator, + joinType: JoinType, + outputExpressions: Seq[NamedExpression], + condition: Option[Expression], + leftAlias: Option[String], + rightAlias: Option[String], + distinct: Boolean = false, + where: Option[Seq[Expression]] = None, + groupBy: Option[Seq[Expression]] = None, + having: Option[Seq[Expression]] = None, + limit: Option[Expression] = None, + localLimit: Option[Expression] = None, + orderBy: Option[Seq[SortOrder]] = None +) extends BinaryNode with SelectAccumulator { + /** @inheritdoc */ + override def compileQuery(prettyPrint: Boolean = false): String = { + val delim = if (prettyPrint) "\n" else " " + val tab = if (prettyPrint) " " else "" + + var sql = s"SELECT$delim$tab" + + s"${fixQualifier(outputExpressions).map(exprToString(_, useQualifier = true)).mkString(", ")}$delim" + + s"FROM$delim$tab$compileJoinExpr" + + if (allFilters.nonEmpty) + sql += s"${delim}WHERE$delim$tab" + + s"${fixQualifier(allFilters).map(exprToString(_, useQualifier = true)).mkString(s" AND$delim$tab")}" + + if (groupBy.exists(_.nonEmpty)) + sql += s"${delim}GROUP BY " + + s"${fixQualifier(groupBy.get).map(exprToString(_, useQualifier = true)).mkString(s",$delim$tab")}" + + if (having.exists(_.nonEmpty)) + sql += s"${delim}HAVING " + + s"${fixQualifier(having.get).map(exprToString(_, useQualifier = true)).mkString(s" AND$delim$tab")}" + + if (orderBy.exists(_.nonEmpty)) + sql += s"${delim}ORDER BY " + + s"${fixQualifier(orderBy.get).map(exprToString(_, useQualifier = true)).mkString(s",$delim$tab")}" + + if (limit.isDefined) + sql += s" LIMIT ${exprToString(fixQualifier0(limit.get), useQualifier = true)}" + + sql + } + + /** + * @return Filters for this query. + */ + private def allFilters: Seq[Expression] = { + val leftFilters = + if (isSimpleTableAcc(left)) + left.asInstanceOf[SingleTableSQLAccumulator].where.getOrElse(Seq.empty) + else + Seq.empty + + val rightFilters = + if (isSimpleTableAcc(right)) + right.asInstanceOf[SingleTableSQLAccumulator].where.getOrElse(Seq.empty) + else Seq.empty + + where.getOrElse(Seq.empty) ++ leftFilters ++ rightFilters + } + + /** + * @return `table1 LEFT JOIN (SELECT....FROM...) table2` part of join query. + */ + private def compileJoinExpr: String = { + val leftJoinSql = + if (isSimpleTableAcc(left)) + left.asInstanceOf[SingleTableSQLAccumulator].table.get + else + s"(${left.compileQuery()}) ${leftAlias.get}" + + val rightJoinSql = { + val leftTableName = + if (isSimpleTableAcc(left)) + left.qualifier + else + leftAlias.get + + if (isSimpleTableAcc(right)) { + val rightTableName = right.asInstanceOf[SingleTableSQLAccumulator].table.get + + if (leftTableName == rightTableName) + s"$rightTableName as ${rightAlias.get}" + else + rightTableName + } else + s"(${right.compileQuery()}) ${rightAlias.get}" + } + + s"$leftJoinSql $joinTypeSQL $rightJoinSql" + + s"${condition.map(expr â s" ON ${exprToString(fixQualifier0(expr), useQualifier = true)}").getOrElse("")}" + } + + /** + * @return SQL string representing specific join type. + */ + private def joinTypeSQL = joinType match { + case Inner â + "JOIN" + case LeftOuter â + "LEFT JOIN" + + case RightOuter â + "RIGHT JOIN" + + case _ â + throw new IgniteException(s"Unsupported join type $joinType") + } + + /** + * Changes table qualifier in case of embedded query. + * + * @param exprs Expressions to fix. + * @tparam T type of input expression. + * @return copy of `exprs` with fixed qualifier. + */ + private def fixQualifier[T <: Expression](exprs: Seq[T]): Seq[T] = + exprs.map(fixQualifier0) + + /** + * Changes table qualifier for single expression. + * + * @param expr Expression to fix. + * @tparam T type of input expression. + * @return copy of `expr` with fixed qualifier. + */ + private def fixQualifier0[T <: Expression](expr: T): T = expr match { + case attr: AttributeReference â + attr.withQualifier(Some(findQualifier(attr))).asInstanceOf[T] + + case _ â + expr.withNewChildren(fixQualifier(expr.children)).asInstanceOf[T] + } + + /** + * Find right qualifier for a `attr`. + * + * @param attr Attribute to fix qualifier in + * @return Right qualifier for a `attr` + */ + private def findQualifier(attr: AttributeReference): String = { + val leftTableName = + if (isSimpleTableAcc(left)) + left.qualifier + else + leftAlias.get + + if (left.outputExpressions.exists(_.exprId == attr.exprId)) + leftTableName + else if (isSimpleTableAcc(right) && right.qualifier != leftTableName) + right.qualifier + else + rightAlias.get + } + + /** @inheritdoc */ + override def simpleString: String = + s"JoinSQLAccumulator(joinType: $joinType, columns: $outputExpressions, condition: $condition)" + + /** @inheritdoc */ + override def withOutputExpressions(outputExpressions: Seq[NamedExpression]): SelectAccumulator = copy(outputExpressions= outputExpressions) + + /** @inheritdoc */ + override def withDistinct(distinct: Boolean): JoinSQLAccumulator = copy(distinct = distinct) + + /** @inheritdoc */ + override def withWhere(where: Seq[Expression]): JoinSQLAccumulator = copy(where = Some(where)) + + /** @inheritdoc */ + override def withGroupBy(groupBy: Seq[Expression]): JoinSQLAccumulator = copy(groupBy = Some(groupBy)) + + /** @inheritdoc */ + override def withHaving(having: Seq[Expression]): JoinSQLAccumulator = copy(having = Some(having)) + + /** @inheritdoc */ + override def withLimit(limit: Expression): JoinSQLAccumulator = copy(limit = Some(limit)) + + /** @inheritdoc */ + override def withLocalLimit(localLimit: Expression): JoinSQLAccumulator = copy(localLimit = Some(localLimit)) + + /** @inheritdoc */ + override def withOrderBy(orderBy: Seq[SortOrder]): JoinSQLAccumulator = copy(orderBy = Some(orderBy)) + + /** @inheritdoc */ + override def output: Seq[Attribute] = outputExpressions.map(toAttributeReference(_, Seq.empty)) + + /** @inheritdoc */ + override lazy val qualifier: String = igniteQueryContext.uniqueTableAlias +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala new file mode 100644 index 0000000..133d355 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl.optimization.accumulator + +import org.apache.ignite.spark.impl.optimization.IgniteQueryContext +import org.apache.spark.sql.catalyst.expressions.{NamedExpression, SortOrder} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +/** + * Generic query info accumulator interface. + */ +private[apache] trait QueryAccumulator extends LogicalPlan { + /** + * @return Ignite query context. + */ + def igniteQueryContext: IgniteQueryContext + + /** + * @return Generated output. + */ + def outputExpressions: Seq[NamedExpression] + + /** + * @return Ordering info. + */ + def orderBy: Option[Seq[SortOrder]] + + /** + * @param outputExpressions New output expressions. + * @return Copy of this accumulator with new output. + */ + def withOutputExpressions(outputExpressions: Seq[NamedExpression]): QueryAccumulator + + /** + * @param orderBy New ordering. + * @return Copy of this accumulator with new order. + */ + def withOrderBy(orderBy: Seq[SortOrder]): QueryAccumulator + + /** + * @param prettyPrint If true human readable query will be generated. + * @return SQL query. + */ + def compileQuery(prettyPrint: Boolean = false): String + + /** + * @return Qualifier that should be use to select data from this accumulator. + */ + def qualifier: String + + /** + * All expressions are resolved when extra optimization executed. + */ + override lazy val resolved = true +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SelectAccumulator.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SelectAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SelectAccumulator.scala new file mode 100644 index 0000000..c1db6f9 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SelectAccumulator.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl.optimization.accumulator + +import org.apache.spark.sql.catalyst.expressions.Expression + +/** + * Generic interface for a SELECT query. + */ +private[apache] trait SelectAccumulator extends QueryAccumulator { + /** + * @return Expression for HAVING part of query. + */ + def having: Option[Seq[Expression]] + + /** + * @return Expression for WHERE part of query. + */ + def where: Option[Seq[Expression]] + + /** + * @return Expression for GROUP BY part of query. + */ + def groupBy: Option[Seq[Expression]] + + /** + * @return Copy of this accumulator with `distinct` flag. + */ + def withDistinct(distinct: Boolean): SelectAccumulator + + /** + * @return Copy of this accumulator with `where` expressions. + */ + def withWhere(where: Seq[Expression]): SelectAccumulator + + /** + * @return Copy of this accumulator with `groupBy` expressions. + */ + def withGroupBy(groupBy: Seq[Expression]): SelectAccumulator + + /** + * @return Copy of this accumulator with `having` expressions. + */ + def withHaving(having: Seq[Expression]): SelectAccumulator + + /** + * @return Copy of this accumulator with `limit` expression. + */ + def withLimit(limit: Expression): SelectAccumulator + + /** + * @return Copy of this accumulator with `localLimit` expression. + */ + def withLocalLimit(localLimit: Expression): SelectAccumulator +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala new file mode 100644 index 0000000..47035b9 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl.optimization.accumulator + +import org.apache.ignite.IgniteException +import org.apache.ignite.spark.impl.optimization._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression, SortOrder} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +/** + * Class for accumulating parts of SQL query to a single Ignite table. + * + * See <a href="http://www.h2database.com/html/grammar.html#select">select syntax of H2</a>. + */ +private[apache] case class SingleTableSQLAccumulator( + igniteQueryContext: IgniteQueryContext, + table: Option[String], + tableExpression: Option[(QueryAccumulator, String)], + outputExpressions: Seq[NamedExpression], + distinct: Boolean = false, + all: Boolean = false, + where: Option[Seq[Expression]] = None, + groupBy: Option[Seq[Expression]] = None, + having: Option[Seq[Expression]] = None, + limit: Option[Expression] = None, + localLimit: Option[Expression] = None, + orderBy: Option[Seq[SortOrder]] = None +) extends SelectAccumulator { + /** @inheritdoc */ + override def compileQuery(prettyPrint: Boolean = false): String = { + val delim = if (prettyPrint) "\n" else " " + val tab = if (prettyPrint) " " else "" + + var sql = s"SELECT$delim$tab${outputExpressions.map(exprToString(_)).mkString(", ")}${delim}" + + s"FROM$delim$tab$compiledTableExpression" + + if (where.exists(_.nonEmpty)) + sql += s"${delim}WHERE$delim$tab${where.get.map(exprToString(_)).mkString(s" AND$delim$tab")}" + + if (groupBy.exists(_.nonEmpty)) + sql += s"${delim}GROUP BY ${groupBy.get.map(exprToString(_)).mkString(s",$delim$tab")}" + + if (having.exists(_.nonEmpty)) + sql += s"${delim}HAVING ${having.get.map(exprToString(_)).mkString(s" AND$delim$tab")}" + + if (orderBy.exists(_.nonEmpty)) + sql += s"${delim}ORDER BY ${orderBy.get.map(exprToString(_)).mkString(s",$delim$tab")}" + + if (limit.isDefined) + sql += s" LIMIT ${limit.map(exprToString(_)).get}" + + sql + } + + /** + * @return From table SQL query part. + */ + private def compiledTableExpression: String = table match { + case Some(tableName) â + tableName + + case None â tableExpression match { + case Some((acc, alias)) â + s"(${acc.compileQuery()}) $alias" + + case None â + throw new IgniteException("Unknown table.") + } + } + + /** @inheritdoc */ + override def simpleString: String = + s"IgniteSQLAccumulator(table: $table, columns: $outputExpressions, distinct: $distinct, all: $all, " + + s"where: $where, groupBy: $groupBy, having: $having, limit: $limit, orderBy: $orderBy)" + + /** @inheritdoc */ + override def withOutputExpressions(outputExpressions: Seq[NamedExpression]): SelectAccumulator = + copy(outputExpressions= outputExpressions) + + /** @inheritdoc */ + override def withDistinct(distinct: Boolean): SingleTableSQLAccumulator = copy(distinct = distinct) + + /** @inheritdoc */ + override def withWhere(where: Seq[Expression]): SingleTableSQLAccumulator = copy(where = Some(where)) + + /** @inheritdoc */ + override def withGroupBy(groupBy: Seq[Expression]): SingleTableSQLAccumulator = copy(groupBy = Some(groupBy)) + + /** @inheritdoc */ + override def withHaving(having: Seq[Expression]): SingleTableSQLAccumulator = copy(having = Some(having)) + + /** @inheritdoc */ + override def withLimit(limit: Expression): SingleTableSQLAccumulator = copy(limit = Some(limit)) + + /** @inheritdoc */ + override def withLocalLimit(localLimit: Expression): SingleTableSQLAccumulator = copy(localLimit = Some(localLimit)) + + /** @inheritdoc */ + override def withOrderBy(orderBy: Seq[SortOrder]): SingleTableSQLAccumulator = copy(orderBy = Some(orderBy)) + + /** @inheritdoc */ + override def output: Seq[Attribute] = outputExpressions.map(toAttributeReference(_, Seq.empty)) + + /** @inheritdoc */ + override def qualifier: String = table.getOrElse(tableExpression.get._2) + + /** @inheritdoc */ + override def children: Seq[LogicalPlan] = tableExpression.map(te â Seq(te._1)).getOrElse(Nil) +}
