http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala new file mode 100644 index 0000000..723e17a --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl.optimization.accumulator + +import org.apache.ignite.spark.impl.optimization.{IgniteQueryContext, exprToString, toAttributeReference} +import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder} + +/** + * Accumulator to store info about UNION query. + */ +private[apache] case class UnionSQLAccumulator( + igniteQueryContext: IgniteQueryContext, + children: Seq[QueryAccumulator], + outputExpressions: Seq[NamedExpression], + orderBy: Option[Seq[SortOrder]] = None +) extends QueryAccumulator { + /** @inheritdoc */ + override def compileQuery(prettyPrint: Boolean = false): String = { + val delim = if (prettyPrint) "\n" else " " + val tab = if (prettyPrint) " " else "" + + val query = children.map(_.compileQuery(prettyPrint)).mkString(s"${delim}UNION$delim") + + orderBy match { + case Some(sortOrders) â + query + s"${delim}ORDER BY ${sortOrders.map(exprToString(_)).mkString(s",$delim$tab")}" + + case None â query + } + } + + /** @inheritdoc */ + override def simpleString: String = + s"UnionSQLAccumulator(orderBy: ${orderBy.map(_.map(exprToString(_)).mkString(", ")).getOrElse("[]")})" + + /** @inheritdoc */ + override def withOutputExpressions(outputExpressions: Seq[NamedExpression]): QueryAccumulator = + copy(outputExpressions= outputExpressions) + + /** @inheritdoc */ + override def withOrderBy(orderBy: Seq[SortOrder]): QueryAccumulator = copy(orderBy = Some(orderBy)) + + /** @inheritdoc */ + override def output: Seq[Attribute] = outputExpressions.map(toAttributeReference(_, Seq.empty)) + + /** @inheritdoc */ + override lazy val qualifier: String = igniteQueryContext.uniqueTableAlias +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala new file mode 100644 index 0000000..4e168f4 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl + + +import org.apache.ignite.IgniteException +import org.apache.ignite.spark.impl.optimization.accumulator.{QueryAccumulator, SingleTableSQLAccumulator} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count} +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, Expression, NamedExpression} +import org.apache.spark.sql.types._ + +import scala.annotation.tailrec + +/** + */ +package object optimization { + /** + * Constant to store alias in column metadata. + */ + private[optimization] val ALIAS: String = "alias" + + /** + * All `SupportedExpression` implementations. + */ + private val SUPPORTED_EXPRESSIONS: List[SupportedExpressions] = List ( + SimpleExpressions, + SystemExpressions, + AggregateExpressions, + ConditionExpressions, + DateExpressions, + MathExpressions, + StringExpressions + ) + + /** + * @param expr Expression. + * @param useQualifier If true outputs attributes of `expr` with qualifier. + * @param useAlias If true outputs `expr` with alias. + * @return String representation of expression. + */ + def exprToString(expr: Expression, useQualifier: Boolean = false, useAlias: Boolean = true): String = { + @tailrec + def exprToString0(expr: Expression, supportedExpressions: List[SupportedExpressions]): Option[String] = + if (supportedExpressions.nonEmpty) { + val exprStr = supportedExpressions.head.toString( + expr, + exprToString(_, useQualifier, useAlias = false), + useQualifier, + useAlias) + + exprStr match { + case res: Some[String] â + res + case None â + exprToString0(expr, supportedExpressions.tail) + } + } + else + None + + exprToString0(expr, SUPPORTED_EXPRESSIONS) match { + case Some(str) â str + + case None â + throw new IgniteException("Unsupporte expression " + expr) + } + } + + /** + * @param exprs Expressions to check. + * @return True if `exprs` contains only allowed(i.e. can be pushed down to Ignite) expressions false otherwise. + */ + def exprsAllowed(exprs: Seq[Expression]): Boolean = + exprs.forall(exprsAllowed) + + /** + * @param expr Expression to check. + * @return True if `expr` allowed(i.e. can be pushed down to Ignite) false otherwise. + * + */ + def exprsAllowed(expr: Expression): Boolean = + SUPPORTED_EXPRESSIONS.exists(_(expr, exprsAllowed)) + + /** + * Converts `input` into `AttributeReference`. + * + * @param input Expression to convert. + * @param existingOutput Existing output. + * @param exprId Optional expression ID to use. + * @param alias Optional alias for a result. + * @return Converted expression. + */ + def toAttributeReference(input: Expression, existingOutput: Seq[NamedExpression], exprId: Option[ExprId] = None, + alias: Option[String] = None): AttributeReference = { + + input match { + case attr: AttributeReference â + val toCopy = existingOutput.find(_.exprId == attr.exprId).getOrElse(attr) + + AttributeReference( + name = toCopy.name, + dataType = toCopy.dataType, + metadata = alias + .map(new MetadataBuilder().withMetadata(toCopy.metadata).putString(ALIAS, _).build()) + .getOrElse(toCopy.metadata) + )(exprId = exprId.getOrElse(toCopy.exprId), qualifier = toCopy.qualifier, isGenerated = toCopy.isGenerated) + + case a: Alias â + toAttributeReference(a.child, existingOutput, Some(a.exprId), Some(alias.getOrElse(a.name))) + + case agg: AggregateExpression â + agg.aggregateFunction match { + case c: Count â + if (agg.isDistinct) + AttributeReference( + name = s"COUNT(DISTINCT ${c.children.map(exprToString(_)).mkString(" ")})", + dataType = LongType, + metadata = alias + .map(new MetadataBuilder().putString(ALIAS, _).build()) + .getOrElse(Metadata.empty) + )(exprId = exprId.getOrElse(agg.resultId)) + else + AttributeReference( + name = s"COUNT(${c.children.map(exprToString(_)).mkString(" ")})", + dataType = LongType, + metadata = alias + .map(new MetadataBuilder().putString(ALIAS, _).build()) + .getOrElse(Metadata.empty) + )(exprId = exprId.getOrElse(agg.resultId)) + + case _ â + toAttributeReference(agg.aggregateFunction, existingOutput, Some(exprId.getOrElse(agg.resultId)), alias) + } + + case ne: NamedExpression â + AttributeReference( + name = exprToString(input), + dataType = input.dataType, + metadata = alias + .map(new MetadataBuilder().withMetadata(ne.metadata).putString(ALIAS, _).build()) + .getOrElse(Metadata.empty) + )(exprId = exprId.getOrElse(ne.exprId)) + + case _ if exprsAllowed(input) â + AttributeReference( + name = exprToString(input), + dataType = input.dataType, + metadata = alias + .map(new MetadataBuilder().putString(ALIAS, _).build()) + .getOrElse(Metadata.empty) + )(exprId = exprId.getOrElse(NamedExpression.newExprId)) + + case _ â + throw new IgniteException(s"Unsupported column expression $input") + } + } + + /** + * @param dataType Spark data type. + * @return SQL data type. + */ + def toSqlType(dataType: DataType): String = dataType match { + case BooleanType â "BOOLEAN" + case IntegerType â "INT" + case ByteType â "TINYINT" + case ShortType â "SMALLINT" + case LongType â "BIGINT" + case DecimalType() â "DECIMAL" + case DoubleType â "DOUBLE" + case FloatType â "REAL" + case DateType â "DATE" + case TimestampType â "TIMESTAMP" + case StringType â "VARCHAR" + case BinaryType â "BINARY" + case ArrayType(_, _) â "ARRAY" + case _ â + throw new IgniteException(s"$dataType not supported!") + } + + /** + * @param expr Expression + * @return True if expression or some of it children is AggregateExpression, false otherwise. + */ + def hasAggregateInside(expr: Expression): Boolean = { + def hasAggregateInside0(expr: Expression): Boolean = expr match { + case AggregateExpression(_, _, _, _) â + true + + case e: Expression â + e.children.exists(hasAggregateInside0) + } + + hasAggregateInside0(expr) + } + + /** + * Check if `acc` representing simple query. + * Simple is `SELECT ... FROM table WHERE ... ` like query. + * Without aggregation, limits, order, embedded select expressions. + * + * @param acc Accumulator to check. + * @return True if accumulator stores simple query info, false otherwise. + */ + def isSimpleTableAcc(acc: QueryAccumulator): Boolean = acc match { + case acc: SingleTableSQLAccumulator if acc.table.isDefined â + acc.groupBy.isEmpty && + acc.localLimit.isEmpty && + acc.orderBy.isEmpty && + !acc.distinct && + !acc.outputExpressions.exists(hasAggregateInside) + + case _ â + false + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala index 4634a97..6502c0f 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala @@ -19,12 +19,15 @@ package org.apache.ignite.spark import org.apache.commons.lang.StringUtils.equalsIgnoreCase import org.apache.ignite.{Ignite, IgniteException, IgniteState, Ignition} -import org.apache.ignite.cache.QueryEntity +import org.apache.ignite.cache.{CacheMode, QueryEntity} +import org.apache.ignite.cluster.ClusterNode import org.apache.ignite.configuration.CacheConfiguration import org.apache.ignite.internal.util.lang.GridFunc.contains +import org.apache.spark.Partition import org.apache.spark.sql.catalyst.catalog.SessionCatalog import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer package object impl { /** @@ -129,4 +132,47 @@ package object impl { */ def isKeyColumn(table: QueryEntity, column: String): Boolean = contains(table.getKeyFields, column) || equalsIgnoreCase(table.getKeyFieldName, column) + + /** + * Computes spark partitions for a given cache. + * + * @param ic Ignite context. + * @param cacheName Cache name + * @return Array of IgniteDataFramPartition + */ + def calcPartitions(ic: IgniteContext, cacheName: String): Array[Partition] = { + val cache = ic.ignite().cache[Any, Any](cacheName) + + val ccfg = cache.getConfiguration(classOf[CacheConfiguration[Any, Any]]) + + if (ccfg.getCacheMode == CacheMode.REPLICATED) { + val serverNodes = ic.ignite().cluster().forCacheNodes(cacheName).forServers().nodes() + + Array(IgniteDataFramePartition(0, serverNodes.head, Stream.from(0).take(1024).toList)) + } + else { + val aff = ic.ignite().affinity(cacheName) + + val parts = aff.partitions() + + val nodesToParts = (0 until parts).foldLeft(Map[ClusterNode, ArrayBuffer[Int]]()) { + case (nodeToParts, ignitePartIdx) â + val primary = aff.mapPartitionToPrimaryAndBackups(ignitePartIdx).head + + if (nodeToParts.contains(primary)) { + nodeToParts(primary) += ignitePartIdx + + nodeToParts + } + else + nodeToParts + (primary â ArrayBuffer[Int](ignitePartIdx)) + } + + val partitions = nodesToParts.zipWithIndex.map { case ((node, nodesParts), i) â + IgniteDataFramePartition(i, node, nodesParts.toList) + } + + partitions.toArray + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala new file mode 100644 index 0000000..b23cd6f --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.ignite + +import org.apache.ignite.IgniteException +import org.apache.ignite.spark.impl.{IgniteSQLAccumulatorRelation, IgniteSQLRelation, sqlCacheName} +import org.apache.ignite.spark.impl.optimization.{accumulator, _} +import org.apache.ignite.spark.impl.optimization.accumulator._ +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.LogicalRelation + +/** + * Query plan optimization for a Ignite based queries. + */ +object IgniteOptimization extends Rule[LogicalPlan] with Logging { + /** @inheritdoc */ + override def apply(plan: LogicalPlan): LogicalPlan = { + logDebug("") + logDebug("== Plan Before Ignite Operator Push Down ==") + logDebug(plan.toString()) + + val transformed = fixAmbiguousOutput(pushDownOperators(plan)) + + logDebug("") + logDebug("== Plan After Ignite Operator Push Down ==") + logDebug(transformed.toString()) + + makeIgniteAccRelation(transformed) + } + + /** + * Change query plan by accumulating query parts supported by Ignite into `QueryAccumulator`. + * + * @param plan Query plan. + * @return Transformed plan. + */ + private def pushDownOperators(plan: LogicalPlan): LogicalPlan = { + val aliasIndexIterator = Stream.from(1).iterator + + //Flag to indicate that some step was skipped due to unsupported expression. + //When it true we has to skip entire transformation of higher level Nodes. + var stepSkipped = true + + //Applying optimization rules from bottom to up tree nodes. + plan.transformUp { + //We found basic node to transform. + //We create new accumulator and going to the upper layers. + case LogicalRelation(igniteSqlRelation: IgniteSQLRelation[_, _], output, _catalogTable) â + //Clear flag to optimize each statement separately + stepSkipped = false + + val igniteQueryContext = IgniteQueryContext( + igniteContext = igniteSqlRelation.ic, + sqlContext = igniteSqlRelation.sqlContext, + catalogTable = _catalogTable, + aliasIndex = aliasIndexIterator, + cacheName = + sqlCacheName(igniteSqlRelation.ic.ignite(), igniteSqlRelation.tableName) + .getOrElse(throw new IgniteException("Unknown table"))) + + //Logical Relation is bottomest TreeNode in LogicalPlan. + //We replace it with accumulator. + //We push all supported SQL operator into it on the higher tree levels. + SingleTableSQLAccumulator( + igniteQueryContext = igniteQueryContext, + table = Some(igniteSqlRelation.tableName), + tableExpression = None, + outputExpressions = output.map(attr â attr.withQualifier(Some(igniteSqlRelation.tableName)))) + + case project: Project if !stepSkipped && exprsAllowed(project.projectList) â + //Project layer just changes output of current query. + project.child match { + case acc: SelectAccumulator â + acc.withOutputExpressions( + substituteExpressions(project.projectList, acc.outputExpressions)) + + case _ â + throw new IgniteException("stepSkipped == true but child is not SelectAccumulator") + } + + case sort: Sort if !stepSkipped && isSortPushDownAllowed(sort.order, sort.global) â + sort.child match { + case acc: QueryAccumulator â + acc.withOrderBy(sort.order) + + case _ â + throw new IgniteException("stepSkipped == true but child is not SelectAccumulator") + } + + case filter: Filter if !stepSkipped && exprsAllowed(filter.condition) â + + filter.child match { + case acc: SelectAccumulator â + if (hasAggregateInside(filter.condition) || acc.groupBy.isDefined) + acc.withHaving(acc.having.getOrElse(Nil) :+ filter.condition) + else + acc.withWhere(acc.where.getOrElse(Nil) :+ filter.condition) + + case _ â + throw new IgniteException("stepSkipped == true but child is not SelectAccumulator") + } + + case agg: Aggregate + if !stepSkipped && exprsAllowed(agg.groupingExpressions) && exprsAllowed(agg.aggregateExpressions) â + + agg.child match { + case acc: SelectAccumulator â + if (acc.groupBy.isDefined) { + val tableAlias = acc.igniteQueryContext.uniqueTableAlias + + accumulator.SingleTableSQLAccumulator( + igniteQueryContext = acc.igniteQueryContext, + table = None, + tableExpression = Some((acc, tableAlias)), + outputExpressions = agg.aggregateExpressions) + } + else + acc + .withGroupBy(agg.groupingExpressions) + .withOutputExpressions( + substituteExpressions(agg.aggregateExpressions, acc.outputExpressions)) + + case acc: QueryAccumulator â + val tableAlias = acc.igniteQueryContext.uniqueTableAlias + + accumulator.SingleTableSQLAccumulator( + igniteQueryContext = acc.igniteQueryContext, + table = None, + tableExpression = Some((acc, tableAlias)), + outputExpressions = agg.aggregateExpressions) + + case _ â + throw new IgniteException("stepSkipped == true but child is not SelectAccumulator") + } + + case limit: LocalLimit if !stepSkipped && exprsAllowed(limit.limitExpr) â + limit.child match { + case acc: SelectAccumulator â + acc.withLocalLimit(limit.limitExpr) + + case _ â + throw new IgniteException("stepSkipped == true but child is not SelectAccumulator") + } + + case limit: GlobalLimit if !stepSkipped && exprsAllowed(limit.limitExpr) â + limit.child.transformUp { + case acc: SelectAccumulator â + acc.withLimit(limit.limitExpr) + + case _ â + throw new IgniteException("stepSkipped == true but child is not SelectAccumulator") + } + + case union: Union if !stepSkipped && isAllChildrenOptimized(union.children) â + val first = union.children.head.asInstanceOf[QueryAccumulator] + + val subQueries = union.children.map(_.asInstanceOf[QueryAccumulator]) + + UnionSQLAccumulator( + first.igniteQueryContext, + subQueries, + subQueries.head.output) + + case join: Join + if !stepSkipped && isAllChildrenOptimized(Seq(join.left, join.right)) && + join.condition.forall(exprsAllowed) â + + val left = join.left.asInstanceOf[QueryAccumulator] + + val (leftOutput, leftAlias) = + if (!isSimpleTableAcc(left)) { + val tableAlias = left.igniteQueryContext.uniqueTableAlias + + (left.output, Some(tableAlias)) + } + else + (left.output, None) + + val right = join.right.asInstanceOf[QueryAccumulator] + + val (rightOutput, rightAlias) = + if (!isSimpleTableAcc(right) || + leftAlias.getOrElse(left.qualifier) == right.qualifier) { + val tableAlias = right.igniteQueryContext.uniqueTableAlias + + (right.output, Some(tableAlias)) + } + else + (right.output, None) + + JoinSQLAccumulator( + left.igniteQueryContext, + left, + right, + join.joinType, + leftOutput ++ rightOutput, + join.condition, + leftAlias, + rightAlias) + + case unknown â + stepSkipped = true + + unknown + } + } + + /** + * Changes qualifiers for an ambiguous columns names. + * + * @param plan Query plan. + * @return Transformed plan. + */ + private def fixAmbiguousOutput(plan: LogicalPlan): LogicalPlan = plan.transformDown { + case acc: SingleTableSQLAccumulator if acc.children.exists(_.isInstanceOf[JoinSQLAccumulator]) â + val fixedChildOutput = + fixAmbiguousOutput(acc.tableExpression.get._1.outputExpressions, acc.igniteQueryContext) + + val newOutput = substituteExpressions(acc.outputExpressions, fixedChildOutput, changeOnlyName = true) + + acc.copy( + outputExpressions = newOutput, + where = acc.where.map( + substituteExpressions(_, fixedChildOutput, changeOnlyName = true)), + groupBy = acc.groupBy.map( + substituteExpressions(_, fixedChildOutput, changeOnlyName = true)), + having = acc.having.map( + substituteExpressions(_, fixedChildOutput, changeOnlyName = true)), + limit = acc.limit.map( + substituteExpression(_, fixedChildOutput, changeOnlyName = true)), + localLimit = acc.localLimit.map( + substituteExpression(_, fixedChildOutput, changeOnlyName = true)), + orderBy = acc.orderBy.map( + substituteExpressions(_, fixedChildOutput, changeOnlyName = true))) + + acc + + case acc: JoinSQLAccumulator + if acc.left.isInstanceOf[JoinSQLAccumulator] || acc.right.isInstanceOf[JoinSQLAccumulator] â + val leftFixed = acc.left match { + case leftJoin: JoinSQLAccumulator â + val fixedChildOutput = fixAmbiguousOutput(acc.left.outputExpressions, acc.igniteQueryContext) + + val newOutput = + substituteExpressions(acc.outputExpressions, fixedChildOutput, changeOnlyName = true) + + acc.copy( + outputExpressions = newOutput, + left = leftJoin.copy(outputExpressions = fixedChildOutput), + condition = acc.condition.map( + substituteExpression(_, fixedChildOutput, changeOnlyName = true)), + where = acc.where.map( + substituteExpressions(_, fixedChildOutput, changeOnlyName = true)), + groupBy = acc.groupBy.map( + substituteExpressions(_, fixedChildOutput, changeOnlyName = true)), + having = acc.having.map( + substituteExpressions(_, fixedChildOutput, changeOnlyName = true)), + limit = acc.limit.map( + substituteExpression(_, fixedChildOutput, changeOnlyName = true)), + localLimit = acc.localLimit.map( + substituteExpression(_, fixedChildOutput, changeOnlyName = true)), + orderBy = acc.orderBy.map( + substituteExpressions(_, fixedChildOutput, changeOnlyName = true))) + + case _ â acc + } + + val fixed = leftFixed.right match { + case rightJoin: JoinSQLAccumulator â + val fixedChildOutput = + fixAmbiguousOutput(leftFixed.outputExpressions, leftFixed.igniteQueryContext) + + val newOutput = substituteExpressions(leftFixed.outputExpressions, fixedChildOutput) + + leftFixed.copy( + outputExpressions = newOutput, + right = rightJoin.copy(outputExpressions = fixedChildOutput), + condition = acc.condition.map( + substituteExpression(_, fixedChildOutput, changeOnlyName = true)), + where = acc.where.map( + substituteExpressions(_, fixedChildOutput, changeOnlyName = true)), + groupBy = acc.groupBy.map( + substituteExpressions(_, fixedChildOutput, changeOnlyName = true)), + having = acc.having.map( + substituteExpressions(_, fixedChildOutput, changeOnlyName = true)), + limit = acc.limit.map( + substituteExpression(_, fixedChildOutput, changeOnlyName = true)), + localLimit = acc.localLimit.map( + substituteExpression(_, fixedChildOutput, changeOnlyName = true)), + orderBy = acc.orderBy.map( + substituteExpressions(_, fixedChildOutput, changeOnlyName = true))) + + case _ â leftFixed + } + + fixed.copy( + condition = acc.condition.map( + substituteExpression(_, acc.outputExpressions, changeOnlyName = true)), + where = acc.where.map( + substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)), + groupBy = acc.groupBy.map( + substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)), + having = acc.having.map( + substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)), + limit = acc.limit.map( + substituteExpression(_, acc.outputExpressions, changeOnlyName = true)), + localLimit = acc.localLimit.map( + substituteExpression(_, acc.outputExpressions, changeOnlyName = true)), + orderBy = acc.orderBy.map( + substituteExpressions(_, acc.outputExpressions, changeOnlyName = true))) + + case unknown â + unknown + } + + private def fixAmbiguousOutput(exprs: Seq[NamedExpression], ctx: IgniteQueryContext): Seq[NamedExpression] = + exprs.foldLeft((Set[String](), Set[NamedExpression]())) { + case ((uniqueNames, fixed), next) â + if (uniqueNames(next.name)) + (uniqueNames, fixed + Alias(next, ctx.uniqueColumnAlias(next))(exprId = next.exprId)) + else + (uniqueNames + next.name, fixed + next) + }._2.toSeq + + /** + * Substitutes each `QueryAccumulator` with a `LogicalRelation` contains `IgniteSQLAccumulatorRelation`. + * + * @param plan Query plan. + * @return Transformed plan. + */ + private def makeIgniteAccRelation(plan: LogicalPlan): LogicalPlan = + plan.transformDown { + case acc: QueryAccumulator â + new LogicalRelation ( + relation = IgniteSQLAccumulatorRelation(acc), + output = acc.outputExpressions.map(toAttributeReference(_, Seq.empty)), + catalogTable = acc.igniteQueryContext.catalogTable) + } + + /** + * @param order Order. + * @param global True if order applied to entire result set false if ordering per-partition. + * @return True if sort can be pushed down to Ignite, false otherwise. + */ + private def isSortPushDownAllowed(order: Seq[SortOrder], global: Boolean): Boolean = + global && order.map(_.child).forall(exprsAllowed) + + /** + * @param children Plans to check. + * @return True is all plan are `QueryAccumulator`, false otherwise. + */ + private def isAllChildrenOptimized(children: Seq[LogicalPlan]): Boolean = + children.forall { + case _: QueryAccumulator â + true + + case _ â + false + } + + /** + * Changes expression from `exprs` collection to expression with same `exprId` from `substitution`. + * + * @param exprs Expressions to substitute. + * @param substitution Expressions for substitution + * @param changeOnlyName If true substitute only expression name. + * @tparam T Concrete expression type. + * @return Substituted expressions. + */ + private def substituteExpressions[T <: Expression](exprs: Seq[T], substitution: Seq[NamedExpression], + changeOnlyName: Boolean = false): Seq[T] = { + + exprs.map(substituteExpression(_, substitution, changeOnlyName)) + } + + private def substituteExpression[T <: Expression](expr: T, substitution: Seq[NamedExpression], + changeOnlyName: Boolean): T = expr match { + case ne: NamedExpression â + substitution.find(_.exprId == ne.exprId) match { + case Some(found) â + if (!changeOnlyName) + found.asInstanceOf[T] + else ne match { + case alias: Alias â + Alias( + AttributeReference( + found.name, + found.dataType, + nullable = found.nullable, + metadata = found.metadata)( + exprId = found.exprId, + qualifier = found.qualifier, + isGenerated = found.isGenerated), + alias.name) ( + exprId = alias.exprId, + qualifier = alias.qualifier, + explicitMetadata = alias.explicitMetadata, + isGenerated = alias.isGenerated).asInstanceOf[T] + + case attr: AttributeReference â + attr.copy(name = found.name)( + exprId = found.exprId, + qualifier = found.qualifier, + isGenerated = found.isGenerated).asInstanceOf[T] + + case _ â ne.asInstanceOf[T] + } + + case None â + expr.withNewChildren( + substituteExpressions(expr.children, substitution, changeOnlyName)).asInstanceOf[T] + } + + case _ â + expr.withNewChildren( + substituteExpressions(expr.children, substitution, changeOnlyName)).asInstanceOf[T] + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala index 8860590..1fccc3a 100644 --- a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala +++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala @@ -66,8 +66,14 @@ class IgniteSparkSession private(ic: IgniteContext, proxy: SparkSession) extends new IgniteSharedState(ic, sparkContext) /** @inheritdoc */ - @transient override lazy val sessionState: SessionState = - new SessionStateBuilder(self, None).build() + @transient override lazy val sessionState: SessionState = { + val sessionState = new SessionStateBuilder(self, None).build() + + sessionState.experimentalMethods.extraOptimizations = + sessionState.experimentalMethods.extraOptimizations :+ IgniteOptimization + + sessionState + } /** @inheritdoc */ @transient override lazy val conf: RuntimeConfig = proxy.conf http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala index 8613592..29a4e6f 100644 --- a/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala @@ -19,13 +19,16 @@ package org.apache.ignite.spark import org.apache.ignite.{Ignite, Ignition} import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} -import org.apache.spark.sql.SparkSession -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSpec, Matchers} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.scalatest._ import java.lang.{Long â JLong} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.cache.query.annotations.QuerySqlField -import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.internal.IgnitionEx.loadConfiguration +import org.apache.ignite.spark.AbstractDataFrameSpec.configuration +import org.apache.ignite.spark.impl.IgniteSQLAccumulatorRelation +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.ignite.spark.AbstractDataFrameSpec._ import scala.annotation.meta.field @@ -33,12 +36,13 @@ import scala.reflect.ClassTag /** */ -abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAndAfterAll with BeforeAndAfter { +abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAndAfterAll with BeforeAndAfter + with Assertions { var spark: SparkSession = _ var client: Ignite = _ - override protected def beforeAll() = { + override protected def beforeAll(): Unit = { for (i â 0 to 3) Ignition.start(configuration("grid-" + i, client = false)) @@ -47,7 +51,7 @@ abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAn createSparkSession() } - override protected def afterAll() = { + override protected def afterAll(): Unit = { Ignition.stop("client", false) for (i â 0 to 3) @@ -108,6 +112,7 @@ abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAn cache.query(qry.setArgs(1L.asInstanceOf[JLong], "Forest Hill")).getAll cache.query(qry.setArgs(2L.asInstanceOf[JLong], "Denver")).getAll cache.query(qry.setArgs(3L.asInstanceOf[JLong], "St. Petersburg")).getAll + cache.query(qry.setArgs(4L.asInstanceOf[JLong], "St. Petersburg")).getAll } def createEmployeeCache(client: Ignite, cacheName: String): Unit = { @@ -119,6 +124,31 @@ abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAn cache.put("key2", Employee(2, "Sarah Connor", 32, 10000)) cache.put("key3", Employee(3, "Arnold Schwarzenegger", 27, 1000)) } + + def checkQueryData[T](res: DataFrame, expectedRes: Product) + (implicit ord: T â Ordered[T]): Unit = + checkQueryData(res, expectedRes, _.getAs[T](0)) + + def checkQueryData[Ordered](res: DataFrame, expectedRes: Product, sorter: Row => Ordered) + (implicit ord: Ordering[Ordered]): Unit = { + val data = res.rdd.collect.sortBy(sorter) + + for(i â 0 until expectedRes.productArity) { + val row = data(i) + + if (row.size == 1) + assert(row(0) == expectedRes.productElement(i), s"row[$i, 0] = ${row(0)} should be equal ${expectedRes.productElement(i)}") + else { + val expectedRow: Product = expectedRes.productElement(i).asInstanceOf[Product] + + assert(expectedRow.productArity == row.size, s"Rows size should be equal, but expected.size=${expectedRow.productArity} " + + s"and row.size=${row.size}") + + for (j â 0 until expectedRow.productArity) + assert(row(j) == expectedRow.productElement(j), s"row[$i, $j] = ${row(j)} should be equal ${expectedRow.productElement(j)}") + } + } + } } object AbstractDataFrameSpec { @@ -135,7 +165,7 @@ object AbstractDataFrameSpec { val PERSON_TBL_NAME_2 = "person2" def configuration(igniteInstanceName: String, client: Boolean): IgniteConfiguration = { - val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1() + val cfg = loadConfiguration(TEST_CONFIG_FILE).get1() cfg.setClientMode(client) @@ -167,6 +197,30 @@ object AbstractDataFrameSpec { } /** + * @param df Data frame. + * @param qry SQL Query. + */ + def checkOptimizationResult(df: DataFrame, qry: String = ""): Unit = { + df.explain(true) + + val plan = df.queryExecution.optimizedPlan + + val cnt = plan.collectLeaves.count { + case LogicalRelation(relation: IgniteSQLAccumulatorRelation[_, _], _, _) â + if (qry != "") + assert(qry.toLowerCase == relation.acc.compileQuery().toLowerCase, + s"Generated query should be equal to expected.\nexpected - $qry\ngenerated - ${relation.acc.compileQuery()}") + + true + + case _ â + false + } + + assert(cnt != 0, s"Plan should contains IgniteSQLAccumulatorRelation") + } + + /** * Enclose some closure, so it doesn't on outer object(default scala behaviour) while serializing. */ def enclose[E, R](enclosed: E)(func: E => R): R = func(enclosed) http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala index 6077211..d87d234 100644 --- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala @@ -58,7 +58,7 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec { it("Should provide ability to query SQL table without explicit registration") { val res = igniteSession.sql("SELECT id, name FROM city").rdd - res.count should equal(3) + res.count should equal(4) val cities = res.collect.sortBy(_.getAs[JLong]("id")) @@ -66,7 +66,8 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec { Array( (1, "Forest Hill"), (2, "Denver"), - (3, "St. Petersburg") + (3, "St. Petersburg"), + (4, "St. Petersburg") ) ) } @@ -136,7 +137,7 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec { createEmployeeCache(client, EMPLOYEE_CACHE_NAME) - val configProvider = enclose(null) (x â () â { + val configProvider = enclose(null) (_ â () â { val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1() cfg.setClientMode(true) http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala index cdd26cd..c5df901 100644 --- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala @@ -17,9 +17,6 @@ package org.apache.ignite.spark -import java.lang.{Integer â JInteger, String â JString} - -import org.apache.ignite.Ignite import org.apache.ignite.spark.AbstractDataFrameSpec._ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types._ @@ -45,7 +42,7 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec { ("IS_RESIDENT", BooleanType, true), ("SALARY", DoubleType, true), ("PENSION", DoubleType, true), - ("ACCOUNT", DecimalType(10, 0), true), + ("ACCOUNT", IgniteRDD.DECIMAL, true), ("AGE", IntegerType, true), ("ID", LongType, false), ("CITY_ID", LongType, false)) http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala index 2ceb44a..b3f7026 100644 --- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala @@ -28,5 +28,12 @@ class IgniteDataFrameSuite extends Suites ( new IgniteSQLDataFrameWriteSpec, new IgniteSQLDataFrameIgniteSessionWriteSpec, new IgniteDataFrameWrongConfigSpec, - new IgniteCatalogSpec + new IgniteCatalogSpec, + new IgniteOptimizationSpec, + new IgniteOptimizationStringFuncSpec, + new IgniteOptimizationMathFuncSpec, + new IgniteOptimizationAggregationFuncSpec, + new IgniteOptimizationSystemFuncSpec, + new IgniteOptimizationJoinSpec, + new IgniteOptimizationDisableEnableSpec ) http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationAggregationFuncSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationAggregationFuncSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationAggregationFuncSpec.scala new file mode 100644 index 0000000..d2527c8 --- /dev/null +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationAggregationFuncSpec.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.Ignite +import org.apache.ignite.cache.query.SqlFieldsQuery +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import java.lang.{Double â JDouble, Long â JLong} + +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose} +import org.apache.spark.sql.ignite.IgniteSparkSession + +/** + */ +@RunWith(classOf[JUnitRunner]) +class IgniteOptimizationAggregationFuncSpec extends AbstractDataFrameSpec { + var igniteSession: IgniteSparkSession = _ + + describe("Supported optimized aggregation functions") { + it("COUNT") { + val df = igniteSession.sql("SELECT count(*) FROM numbers") + + checkOptimizationResult(df, "SELECT count(1) FROM numbers") + + val data = Tuple1(21) + + checkQueryData(df, data) + } + + it("AVG - DECIMAL") { + //TODO: write me + } + + it("AVG - DOUBLE") { + val df = igniteSession.sql("SELECT AVG(val) FROM numbers WHERE id <= 3") + + checkOptimizationResult(df, "SELECT AVG(val) FROM numbers WHERE id IS NOT NULL and id <= 3") + + val data = Tuple1(.5) + + checkQueryData(df, data) + } + + it("MIN - DOUBLE") { + val df = igniteSession.sql("SELECT MIN(val) FROM numbers") + + checkOptimizationResult(df, "SELECT MIN(val) FROM numbers") + + val data = Tuple1(-1.0) + + checkQueryData(df, data) + } + + it("MAX - DOUBLE") { + val df = igniteSession.sql("SELECT MAX(val) FROM numbers") + + checkOptimizationResult(df, "SELECT MAX(val) FROM numbers") + + val data = Tuple1(180.0) + + checkQueryData(df, data) + } + + it("SUM - DOUBLE") { + val df = igniteSession.sql("SELECT SUM(val) FROM numbers WHERE id <= 3") + + checkOptimizationResult(df, "SELECT SUM(val) FROM numbers WHERE id IS NOT NULL and id <= 3") + + val data = Tuple1(1.5) + + checkQueryData(df, data) + } + + it("SUM - DECIMAL - 1") { + val df = igniteSession.sql("SELECT SUM(decimal_val) FROM numbers WHERE id IN (18, 19, 20)") + + checkOptimizationResult(df, "SELECT SUM(decimal_val) FROM numbers WHERE id IN (18, 19, 20)") + + df.printSchema() + + val data = Tuple1(new java.math.BigDecimal(10.5).setScale(3)) + + checkQueryData(df, data) + } + + it("SUM - DECIMAL - 2") { + val df = igniteSession.sql("SELECT SUM(decimal_val) FROM numbers WHERE id IN (18, 19, 20, 21)") + + checkOptimizationResult(df, "SELECT SUM(decimal_val) FROM numbers WHERE id IN (18, 19, 20, 21)") + + val data = Tuple1(new java.math.BigDecimal(15).setScale(3)) + + checkQueryData(df, data) + } + + it("SUM - LONG") { + val df = igniteSession.sql("SELECT SUM(int_val) FROM numbers WHERE id in (15, 16, 17)") + + checkOptimizationResult(df, "SELECT CAST(SUM(int_val) AS BIGINT) as \"SUM(int_val)\" " + + "FROM numbers WHERE id in (15, 16, 17)") + + val data = Tuple1(6L) + + checkQueryData(df, data) + } + } + + def createNumberTable(client: Ignite, cacheName: String): Unit = { + val cache = client.cache(cacheName) + + cache.query(new SqlFieldsQuery( + """ + | CREATE TABLE numbers ( + | id LONG, + | val DOUBLE, + | int_val LONG, + | decimal_val DECIMAL(5, 5), + | PRIMARY KEY (id)) WITH "backups=1" + """.stripMargin)).getAll + + var qry = new SqlFieldsQuery("INSERT INTO numbers (id, val) values (?, ?)") + + cache.query(qry.setArgs(1L.asInstanceOf[JLong], .0.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(2L.asInstanceOf[JLong], .5.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(3L.asInstanceOf[JLong], 1.0.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(4L.asInstanceOf[JLong], 2.0.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(5L.asInstanceOf[JLong], 4.0.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(6L.asInstanceOf[JLong], -0.5.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(7L.asInstanceOf[JLong], -1.0.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(8L.asInstanceOf[JLong], 42.0.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(9L.asInstanceOf[JLong], .51.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(10L.asInstanceOf[JLong], .49.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(11L.asInstanceOf[JLong], 100.0.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(12L.asInstanceOf[JLong], (Math.E*Math.E).asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(13L.asInstanceOf[JLong], Math.PI.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(14L.asInstanceOf[JLong], 180.0.asInstanceOf[JDouble])).getAll + + qry = new SqlFieldsQuery("INSERT INTO numbers (id, int_val) values (?, ?)") + + cache.query(qry.setArgs(15L.asInstanceOf[JLong], 1L.asInstanceOf[JLong])).getAll + cache.query(qry.setArgs(16L.asInstanceOf[JLong], 2L.asInstanceOf[JLong])).getAll + cache.query(qry.setArgs(17L.asInstanceOf[JLong], 3L.asInstanceOf[JLong])).getAll + + qry = new SqlFieldsQuery("INSERT INTO numbers (id, decimal_val) values (?, ?)") + + cache.query(qry.setArgs(18L.asInstanceOf[JLong], new java.math.BigDecimal(2.5))).getAll + cache.query(qry.setArgs(19L.asInstanceOf[JLong], new java.math.BigDecimal(3.5))).getAll + cache.query(qry.setArgs(20L.asInstanceOf[JLong], new java.math.BigDecimal(4.5))).getAll + cache.query(qry.setArgs(21L.asInstanceOf[JLong], new java.math.BigDecimal(4.5))).getAll + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + + createNumberTable(client, DEFAULT_CACHE) + + val configProvider = enclose(null) (x â () â { + val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1() + + cfg.setClientMode(true) + + cfg.setIgniteInstanceName("client-2") + + cfg + }) + + igniteSession = IgniteSparkSession.builder() + .config(spark.sparkContext.getConf) + .igniteConfigProvider(configProvider) + .getOrCreate() + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDateFuncSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDateFuncSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDateFuncSpec.scala new file mode 100644 index 0000000..7912cd0 --- /dev/null +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDateFuncSpec.scala @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.Ignite +import org.apache.ignite.cache.query.SqlFieldsQuery +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose} +import org.apache.spark.sql.ignite.IgniteSparkSession +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import java.lang.{Long â JLong} +import java.util.{Date â JDate} +import java.text.SimpleDateFormat +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeUnit.DAYS + +/** + */ +@RunWith(classOf[JUnitRunner]) +class IgniteOptimizationDateFuncSpec extends AbstractDataFrameSpec { + var igniteSession: IgniteSparkSession = _ + + val format = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss") + + describe("Supported optimized date functions") { + it(" - CURRENT_TIMESTAMP") { + val df = igniteSession.sql("SELECT id, CURRENT_TIMESTAMP() FROM dates WHERE id = 1") + + checkOptimizationResult(df) + + val data = df.rdd.collect + + assert(data(0).getAs[JLong]("id") == 1L) + + val date: JDate = data(0).getAs[JDate]("current_timestamp()") + val millisDiff = new JDate().getTime - date.getTime + + assert(millisDiff <= 30000) + } + + it(" - CURRENT_DATE") { + val df = igniteSession.sql("SELECT id, CURRENT_DATE() FROM dates WHERE id = 1") + + checkOptimizationResult(df) + + val data = df.rdd.collect + + assert(data(0).getAs[JLong]("id") == 1L) + + val date: JDate = data(0).getAs[JDate]("current_date()") + val dayDiff = DAYS.convert(new JDate().getTime - date.getTime, TimeUnit.MILLISECONDS) + + assert(dayDiff <= 1) + } + + it(" - HOUR") { + val df = igniteSession.sql("SELECT HOUR(val) FROM dates WHERE id = 1") + + checkOptimizationResult(df) + + val data = Tuple1(0) + + checkQueryData(df, data) + } + + it(" - MINUTE") { + val df = igniteSession.sql("SELECT MINUTE(val) FROM dates WHERE id = 1") + + checkOptimizationResult(df) + + val data = Tuple1(0) + + checkQueryData(df, data) + } + + it(" - SECOND") { + val df = igniteSession.sql("SELECT SECOND(val) FROM dates WHERE id = 1") + + checkOptimizationResult(df) + + val data = Tuple1(0) + + checkQueryData(df, data) + } + + it(" - MONTH") { + val df = igniteSession.sql("SELECT MONTH(val) FROM dates WHERE id = 1") + + checkOptimizationResult(df) + + val data = Tuple1(0) + + checkQueryData(df, data) + } + + it(" - YEAR") { + val df = igniteSession.sql("SELECT YEAR(val) FROM dates WHERE id = 1") + + checkOptimizationResult(df) + + val data = Tuple1(2017) + + checkQueryData(df, data) + } + + it(" - QUARTER") { + val df = igniteSession.sql("SELECT QUARTER(val) FROM dates WHERE id = 1") + + checkOptimizationResult(df) + + val data = Tuple1(1) + + checkQueryData(df, data) + } + + it(" - WEEK") { + val df = igniteSession.sql("SELECT WEEKOFYEAR(val) FROM dates WHERE id = 1") + + checkOptimizationResult(df) + + val data = Tuple1(1) + + checkQueryData(df, data) + } + + it(" - DAY_OF_MONTH") { + val df = igniteSession.sql("SELECT DAYOFMONTH(val) FROM dates WHERE id = 1") + + checkOptimizationResult(df) + + val data = Tuple1(1) + + checkQueryData(df, data) + } + + it(" - DAY_OF_YEAR") { + val df = igniteSession.sql("SELECT DAYOFYEAR(val) FROM dates WHERE id = 1") + + checkOptimizationResult(df) + + val data = Tuple1(1) + + checkQueryData(df, data) + } + + it(" - DATE_ADD") { + val df = igniteSession.sql("SELECT DATE_ADD(val, 2) FROM dates WHERE id = 1") + + checkOptimizationResult(df) + + val data = Tuple1(format.parse("03.01.2017 00:00:00")) + + checkQueryData(df, data) + } + + it(" - DATEDIFF") { + val df = igniteSession.sql("SELECT " + + "DATEDIFF(val, TO_DATE('2017-01-02 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS')) FROM dates WHERE id = 1") + + checkOptimizationResult(df) + + val data = Tuple1(1) + + checkQueryData(df, data) + } + + it(" - FORMATDATETIME") { + val df = igniteSession.sql("SELECT DATE_FORMAT(val, 'yyyy-MM-dd HH:mm:ss.SSS') FROM dates WHERE id = 1") + + checkOptimizationResult(df) + + val data = Tuple1("2017-01-01 00:00:00.000") + + checkQueryData(df, data) + } + } + + def createDateTable(client: Ignite, cacheName: String): Unit = { + val cache = client.cache(cacheName) + + cache.query(new SqlFieldsQuery( + """ + | CREATE TABLE dates ( + | id LONG, + | val DATE, + | PRIMARY KEY (id)) WITH "backups=1" + """.stripMargin)).getAll + + val qry = new SqlFieldsQuery("INSERT INTO dates(id, val) values (?, ?)") + + cache.query(qry.setArgs(1L.asInstanceOf[JLong], format.parse("01.01.2017 00:00:00"))).getAll + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + + createDateTable(client, DEFAULT_CACHE) + + val configProvider = enclose(null) (x â () â { + val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1() + + cfg.setClientMode(true) + + cfg.setIgniteInstanceName("client-2") + + cfg + }) + + igniteSession = IgniteSparkSession.builder() + .config(spark.sparkContext.getConf) + .igniteConfigProvider(configProvider) + .getOrCreate() + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDisableEnableSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDisableEnableSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDisableEnableSpec.scala new file mode 100644 index 0000000..033af74 --- /dev/null +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDisableEnableSpec.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.spark.AbstractDataFrameSpec.TEST_CONFIG_FILE +import org.apache.ignite.spark.IgniteDataFrameSettings._ +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.ignite.IgniteOptimization +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner + +/** + */ +@RunWith(classOf[JUnitRunner]) +class IgniteOptimizationDisableEnableSpec extends AbstractDataFrameSpec { + var personDataFrame: DataFrame = _ + + describe("Ignite Optimization Disabling/Enabling") { + it("should add Ignite Optimization to a session on a first query") { + if (spark.sparkContext.isStopped) + createSparkSession() + + assert(!igniteOptimizationExists(spark), "Session shouldn't contains IgniteOptimization") + + personDataFrame = spark.read + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, "person") + .load() + + assert(igniteOptimizationExists(spark), + "Session should contains IgniteOptimization after executing query over Ignite Data Frame") + + spark.stop() + } + + it("should remove Ignite Optimization if it disabled at runtime") { + if (!spark.sparkContext.isStopped) + spark.stop() + + val newSession = SparkSession.builder() + .appName("Ignite Optimization check") + .master("local") + .config("spark.executor.instances", "2") + .getOrCreate() + + assert(!igniteOptimizationExists(newSession), "Session shouldn't contains IgniteOptimization") + + var newPersonDataFrame = newSession.read + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, "person") + .load() + + assert(igniteOptimizationExists(newSession), + "Session should contains IgniteOptimization after executing query over Ignite Data Frame") + + + newSession.conf.set(OPTION_DISABLE_SPARK_SQL_OPTIMIZATION, "true") + + newPersonDataFrame = newSession.read + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, "person") + .load() + + assert(!igniteOptimizationExists(newSession), + "Session shouldn't contains IgniteOptimization") + + newSession.close() + } + + it("shouldn't add Ignite Optimization to a session when it's disabled") { + if (!spark.sparkContext.isStopped) + spark.stop() + + val newSession = SparkSession.builder() + .appName("Ignite Optimization check") + .master("local") + .config("spark.executor.instances", "2") + .config(OPTION_DISABLE_SPARK_SQL_OPTIMIZATION, "true") + .getOrCreate() + + assert(!igniteOptimizationExists(newSession), "Session shouldn't contains IgniteOptimization") + + val newPersonDataFrame = newSession.read + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, "person") + .load() + + newPersonDataFrame.createOrReplaceTempView("person") + + val res = newSession.sqlContext.sql("SELECT name FROM person WHERE id = 2").rdd + + res.count should equal(1) + + assert(!igniteOptimizationExists(newSession), "Session shouldn't contains IgniteOptimization") + + newSession.close() + } + } + + def igniteOptimizationExists(session: SparkSession): Boolean = + session.sessionState.experimentalMethods.extraOptimizations.contains(IgniteOptimization) + + override protected def beforeAll(): Unit = { + super.beforeAll() + + createPersonTable(client, "cache1") + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationJoinSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationJoinSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationJoinSpec.scala new file mode 100644 index 0000000..b4b36a8 --- /dev/null +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationJoinSpec.scala @@ -0,0 +1,543 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.Ignite +import org.apache.ignite.cache.query.SqlFieldsQuery +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose} +import org.apache.spark.sql.ignite.IgniteSparkSession +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner + +import java.lang.{Long â JLong} + +/** + */ +@RunWith(classOf[JUnitRunner]) +class IgniteOptimizationJoinSpec extends AbstractDataFrameSpec { + var igniteSession: IgniteSparkSession = _ + + describe("Optimized join queries") { + it("UNION") { + val qry = + """ + | SELECT id, val1 as val FROM jt1 UNION + | SELECT id, val2 as val FROM jt2 UNION + | SELECT id, val3 as val FROM jt3 + |""".stripMargin + + val df = igniteSession.sql(qry) + + checkOptimizationResult(df, + "SELECT id, val FROM (SELECT id, val1 as val FROM jt1 UNION " + + "SELECT id, val2 as val FROM jt2 UNION " + + "SELECT id, val3 as val FROM jt3) table1") + + val data = ( + (1L, "A"), + (1L, "B"), + (2L, "B"), + (2L, "C"), + (2L, "D"), + (3L, "C"), + (3L, "D"), + (3L, "E")) + + checkQueryData(df, data, row â (row.getAs[JLong](0), row.getAs[String](1))) + } + + it("UNION ALL") { + val qry = + """ + | SELECT id, val1 as val FROM jt1 UNION ALL + | SELECT id, val2 as val FROM jt2 UNION ALL + | SELECT id, val3 as val FROM jt3 + |""".stripMargin + + val df = igniteSession.sql(qry) + + checkOptimizationResult(df, + "SELECT id, val1 as val FROM jt1 UNION " + + "SELECT id, val2 as val FROM jt2 UNION " + + "SELECT id, val3 as val FROM jt3") + + val data = ( + (1L, "A"), + (1L, "B"), + (2L, "B"), + (2L, "C"), + (2L, "D"), + (3L, "C"), + (3L, "D"), + (3L, "E")) + + checkQueryData(df, data, row â (row.getAs[JLong](0), row.getAs[String](1))) + } + + it("UNION ALL ORDER") { + val qry = + """ + | SELECT id, val1 as val FROM jt1 UNION ALL + | SELECT id, val2 as val FROM jt2 UNION ALL + | SELECT id, val3 as val FROM jt3 + | ORDER BY id DESC, val + |""".stripMargin + + val df = igniteSession.sql(qry) + + checkOptimizationResult(df, + "SELECT id, val1 as val FROM jt1 UNION " + + "SELECT id, val2 as val FROM jt2 UNION " + + "SELECT id, val3 as val FROM jt3 " + + "ORDER BY id DESC, val") + + val data = ( + (3L, "C"), + (3L, "D"), + (3L, "E"), + (2L, "B"), + (2L, "C"), + (2L, "D"), + (1L, "A"), + (1L, "B") + ) + + checkQueryData(df, data, _ â 0) + } + + it("UNION WITH AGGREGATE") { + val qry = + """ + | SELECT VAL, COUNT(*) FROM ( + | SELECT id, val1 as val FROM jt1 UNION + | SELECT id, val2 as val FROM jt2 UNION + | SELECT id, val3 as val FROM jt3 ) t1 + | GROUP BY val HAVING COUNT(*) > 1 + |""".stripMargin + + val df = igniteSession.sql(qry) + + checkOptimizationResult(df, + "SELECT VAL, count(1) FROM (" + + "SELECT id, val1 AS val FROM JT1 UNION " + + "SELECT id, val2 AS val FROM JT2 UNION " + + "SELECT id, val3 AS val FROM JT3" + + ") table1 GROUP BY val HAVING count(1) > 1") + + val data = ( + ("B", 2L), + ("C", 2L), + ("D", 2L) + ) + + checkQueryData(df, data) + } + + it("AGGREGATE ON AGGREGATE RESULT") { + val qry = + """ + | SELECT SUM(cnt) FROM ( + | SELECT VAL, COUNT(*) as CNT FROM ( + | SELECT id, val1 as val FROM jt1 UNION + | SELECT id, val2 as val FROM jt2 UNION + | SELECT id, val3 as val FROM jt3 ) t1 + | GROUP BY val HAVING COUNT(*) > 1 + | ) t1 + |""".stripMargin + + val df = igniteSession.sql(qry) + + checkOptimizationResult(df, + "SELECT CAST(SUM(cnt) as BIGINT) as \"SUM(cnt)\" FROM (" + + "SELECT count(1) as cnt FROM (" + + "SELECT id, val1 as val FROM jt1 UNION " + + "SELECT id, val2 as val FROM jt2 UNION " + + "SELECT id, val3 as val FROM jt3" + + ") table1 GROUP BY val HAVING count(1) > 1) table2") + + val data = Tuple1(6.0) + + checkQueryData(df, data) + } + + it("SELF INNER JOIN") { + val qry = + """ + |SELECT + | jt1.id, + | jt1.val1, + | jt2.id, + | jt2.val1 + |FROM + | jt1 JOIN + | jt1 as jt2 ON jt1.val1 = jt2.val1 + |""".stripMargin + + val df = igniteSession.sql(qry) + + checkOptimizationResult(df, "SELECT JT1.ID, JT1.VAL1, table1.ID, table1.VAL1 " + + "FROM JT1 JOIN JT1 AS table1 ON jt1.val1 = table1.val1 " + + "WHERE jt1.val1 IS NOT NULL AND table1.val1 IS NOT NULL") + + val data = ( + (1, "A", 1, "A"), + (2, "B", 2, "B"), + (3, "C", 3, "C") + ) + + checkQueryData(df, data) + } + + + it("SELF INNER JOIN WITH WHERE") { + val qry = + """ + |SELECT + | jt1.id, + | jt1.val1, + | jt2.id, + | jt2.val1 + |FROM + | jt1 JOIN + | jt1 as jt2 ON jt1.val1 = jt2.val1 + |WHERE jt2.val1 = 'A' + |""".stripMargin + + val df = igniteSession.sql(qry) + +/* checkOptimizationResult(df, "SELECT JT1.ID, JT1.VAL1, table1.ID, table1.VAL1 " + + "FROM JT1 JOIN JT1 as table1 ON JT1.val1 = table1.val1 " + + "WHERE JT1.val1 = 'A' AND JT1.val1 IS NOT NULL AND table1.val1 IS NOT NULL AND table1.val1 = 'A'")*/ + + val data = Tuple1( + (1, "A", 1, "A") + ) + + checkQueryData(df, data) + } + + + it("INNER JOIN") { + val qry = + """ + |SELECT + | jt1.id as id1, + | jt1.val1, + | jt2.id as id2, + | jt2.val2 + |FROM + | jt1 JOIN + | jt2 ON jt1.val1 = jt2.val2 + |""".stripMargin + + val df = igniteSession.sql(qry) + + checkOptimizationResult(df, "SELECT JT1.ID AS id1, JT1.VAL1, JT2.ID AS id2, JT2.VAL2 " + + "FROM JT1 JOIN JT2 ON jt1.val1 = jt2.val2 " + + "WHERE jt1.val1 IS NOT NULL AND jt2.val2 IS NOT NULL") + + val data = ( + (2, "B", 1, "B"), + (3, "C", 2, "C") + ) + + checkQueryData(df, data) + } + + it("INNER JOIN WITH WHERE") { + val qry = + """ + |SELECT + | jt1.id as id1, + | jt1.val1, + | jt2.id as id2, + | jt2.val2 + |FROM + | jt1 JOIN + | jt2 ON jt1.val1 = jt2.val2 + |WHERE + | jt1.id < 10 + |""".stripMargin + + val df = igniteSession.sql(qry) + + checkOptimizationResult(df, "SELECT jt1.id as id1, jt1.val1, jt2.id as id2, jt2.val2 " + + "FROM jt1 JOIN jt2 ON jt1.val1 = jt2.val2 " + + "WHERE jt1.id IS NOT NULL AND jt1.id < 10 AND jt1.val1 IS NOT NULL and jt2.val2 IS NOT NULL") + + val data = ( + (2, "B", 1, "B"), + (3, "C", 2, "C") + ) + + checkQueryData(df, data) + } + + it("LEFT JOIN") { + val qry = + """ + |SELECT + | jt1.id as id1, + | jt1.val1, + | jt2.id as id2, + | jt2.val2 + |FROM + | jt1 LEFT JOIN + | jt2 ON jt1.val1 = jt2.val2 + |""".stripMargin + + val df = igniteSession.sql(qry) + + checkOptimizationResult(df, "SELECT jt1.id as id1, jt1.val1, jt2.id as id2, jt2.val2 " + + "FROM jt1 LEFT JOIN jt2 ON jt1.val1 = jt2.val2") + + val data = ( + (1, "A", null, null), + (2, "B", 1, "B"), + (3, "C", 2, "C") + ) + + checkQueryData(df, data) + } + + it("RIGHT JOIN") { + val qry = + """ + |SELECT + | jt1.id as id1, + | jt1.val1, + | jt2.id as id2, + | jt2.val2 + |FROM + | jt1 RIGHT JOIN + | jt2 ON jt1.val1 = jt2.val2 + |""".stripMargin + + val df = igniteSession.sql(qry) + + checkOptimizationResult(df, "SELECT jt1.id as id1, jt1.val1, jt2.id as id2, jt2.val2 " + + "FROM jt1 RIGHT JOIN jt2 ON jt1.val1 = jt2.val2") + + val data = ( + (2, "B", 1, "B"), + (3, "C", 2, "C"), + (null, null, 3, "D") + ) + + checkQueryData(df, data, r â if (r.get(0) == null) 100L else r.getAs[Long](0)) + } + + it("JOIN 3 TABLE") { + val qry = + """ + |SELECT + | jt1.id as id1, + | jt1.val1 as val1, + | jt2.id as id2, + | jt2.val2 as val2, + | jt3.id as id3, + | jt3.val3 as val3 + |FROM + | jt1 LEFT JOIN + | jt2 ON jt1.val1 = jt2.val2 LEFT JOIN + | jt3 ON jt1.val1 = jt3.val3 + |""".stripMargin + + val df = igniteSession.sql(qry) + + checkOptimizationResult(df, + "SELECT table1.id as id1, table1.val1, table1.id_2 as id2, table1.val2, jt3.id as id3, jt3.val3 " + + "FROM (" + + "SELECT jt1.val1, jt1.id, jt2.val2, jt2.id as id_2 " + + "FROM JT1 LEFT JOIN jt2 ON jt1.val1 = jt2.val2) table1 LEFT JOIN " + + "jt3 ON table1.val1 = jt3.val3") + + val data = ( + (1, "A", null, null, 1, "A"), + (2, "B", 1, "B", null, null), + (3, "C", 2, "C", null, null)) + + checkQueryData(df, data) + } + + it("JOIN 3 TABLE AND AGGREGATE") { + val qry = + """ + |SELECT SUM(id1) FROM ( + | SELECT + | jt1.id as id1, + | jt1.val1 as val1, + | jt2.id as id2, + | jt2.val2 as val2, + | jt3.id as id3, + | jt3.val3 as val3 + |FROM + | jt1 LEFT JOIN + | jt2 ON jt1.val1 = jt2.val2 LEFT JOIN + | jt3 ON jt1.val1 = jt3.val3 + |) WHERE CONCAT(val1, val2) = 'BB' OR CONCAT(val1, val3) = 'AA' + |""".stripMargin + + val df = igniteSession.sql(qry) + + checkOptimizationResult(df, + "SELECT CAST(SUM(table1.ID) AS BIGINT) AS \"sum(id1)\" FROM " + + "(SELECT JT1.VAL1, JT1.ID, JT2.VAL2 FROM JT1 LEFT JOIN JT2 ON JT1.val1 = JT2.val2) table1 LEFT JOIN " + + "JT3 ON table1.val1 = JT3.val3 " + + "WHERE CONCAT(table1.val1, table1.val2) = 'BB' OR CONCAT(table1.val1, JT3.val3) = 'AA'") + + val data = Tuple1(3) + + checkQueryData(df, data, _ â 0) + } + + it("INNER JOIN SUBQUERY") { + val qry = + """ + |SELECT sum_id, val1, val2 FROM ( + | SELECT + | jt1.id + jt2.id as sum_id, + | jt1.val1 as val1, + | jt2.val2 as val2 + | FROM + | jt1 JOIN + | jt2 ON jt1.val1 = jt2.val2 + |) t1 WHERE sum_id != 15 + |""".stripMargin + + val df = igniteSession.sql(qry) + + checkOptimizationResult(df, + "SELECT jt1.id + jt2.id as sum_id, jt1.val1, jt2.val2 FROM " + + "jt1 JOIN jt2 ON NOT jt1.id + jt2.id = 15 AND jt1.val1 = jt2.val2 " + + "WHERE " + + "jt1.id IS NOT NULL AND " + + "jt1.val1 IS NOT NULL AND " + + "jt2.id IS NOT NULL AND " + + "jt2.val2 IS NOT NULL" + ) + + val data = ( + (3, "B", "B"), + (5, "C", "C") + ) + + checkQueryData(df, data) + } + + it("INNER JOIN SUBQUERY - 2") { + val qry = + """ + |SELECT SUM(sum_id) FROM ( + | SELECT + | jt1.id + jt2.id as sum_id + | FROM + | jt1 JOIN + | jt2 ON jt1.val1 = jt2.val2 + |) t1 WHERE sum_id != 15 + |""".stripMargin + + val df = igniteSession.sql(qry) + + checkOptimizationResult(df, + "SELECT CAST(SUM(JT1.ID + JT2.ID) AS BIGINT) AS \"sum(sum_id)\" " + + "FROM JT1 JOIN JT2 ON NOT JT1.id + JT2.id = 15 AND JT1.val1 = JT2.val2 " + + "WHERE JT1.id IS NOT NULL AND JT1.val1 IS NOT NULL AND JT2.id IS NOT NULL AND JT2.val2 IS NOT NULL") + + val data = Tuple1(8) + + checkQueryData(df, data) + } + } + + def createJoinedTables(client: Ignite, cacheName: String): Unit = { + val cache = client.cache(cacheName) + + cache.query(new SqlFieldsQuery( + """ + | CREATE TABLE jt1 ( + | id LONG, + | val1 VARCHAR, + | PRIMARY KEY (id)) WITH "backups=1" + """.stripMargin)).getAll + + cache.query(new SqlFieldsQuery( + """ + | CREATE TABLE jt2 ( + | id LONG, + | val2 VARCHAR, + | PRIMARY KEY (id)) WITH "backups=1" + """.stripMargin)).getAll + + cache.query(new SqlFieldsQuery( + """ + | CREATE TABLE jt3 ( + | id LONG, + | val3 VARCHAR, + | PRIMARY KEY (id)) WITH "backups=1" + """.stripMargin)).getAll + + var qry = new SqlFieldsQuery("INSERT INTO jt1 (id, val1) values (?, ?)") + + cache.query(qry.setArgs(1L.asInstanceOf[JLong], "A")).getAll + cache.query(qry.setArgs(2L.asInstanceOf[JLong], "B")).getAll + cache.query(qry.setArgs(3L.asInstanceOf[JLong], "C")).getAll + + qry = new SqlFieldsQuery("INSERT INTO jt2 (id, val2) values (?, ?)") + + cache.query(qry.setArgs(1L.asInstanceOf[JLong], "B")).getAll + cache.query(qry.setArgs(2L.asInstanceOf[JLong], "C")).getAll + cache.query(qry.setArgs(3L.asInstanceOf[JLong], "D")).getAll + + qry = new SqlFieldsQuery("INSERT INTO jt3 (id, val3) values (?, ?)") + + cache.query(qry.setArgs(1L.asInstanceOf[JLong], "A")).getAll + cache.query(qry.setArgs(2L.asInstanceOf[JLong], "D")).getAll + cache.query(qry.setArgs(3L.asInstanceOf[JLong], "E")).getAll + + cache.query(new SqlFieldsQuery("CREATE INDEX idx1 ON jt1(val1)")).getAll + cache.query(new SqlFieldsQuery("CREATE INDEX idx2 ON jt2(val2)")).getAll + cache.query(new SqlFieldsQuery("CREATE INDEX idx3 ON jt3(val3)")).getAll + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + + createPersonTable(client, DEFAULT_CACHE) + + createCityTable(client, DEFAULT_CACHE) + + createJoinedTables(client, DEFAULT_CACHE) + + val configProvider = enclose(null) (x â () â { + val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1() + + cfg.setClientMode(true) + + cfg.setIgniteInstanceName("client-2") + + cfg + }) + + igniteSession = IgniteSparkSession.builder() + .config(spark.sparkContext.getConf) + .igniteConfigProvider(configProvider) + .getOrCreate() + } +}
