Repository: carbondata Updated Branches: refs/heads/master c5bfe4acf -> afe2b669b
[CARBONDATA-3136] Fix JVM crash with preaggregate datamap when average of decimal column is taken with orderby problem: JVM crash with preaggregate datamap when average of decimal column is taken with orderby. cause: When preparing plan with preaggregate datamap, decimal is cast to double in average expression. This was leading to JVM crash in spark as we were filling with wrong precision (callstack mentioned in JIRA) solution: division result of average, should be casted to decimal instead of double for decimal datatype. This closes #2958 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/afe2b669 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/afe2b669 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/afe2b669 Branch: refs/heads/master Commit: afe2b669bdab68f1528bc861e72480b1b37509a3 Parents: c5bfe4a Author: ajantha-bhat <ajanthab...@gmail.com> Authored: Tue Nov 27 19:37:49 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Wed Nov 28 18:23:17 2018 +0530 ---------------------------------------------------------------------- .../TestPreAggregateExpressions.scala | 11 +++++++++ .../sql/hive/CarbonPreAggregateRules.scala | 26 +++++++++++++++----- 2 files changed, 31 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/afe2b669/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala index b3b71a6..a7511fd 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala @@ -164,6 +164,17 @@ class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll { } } + test("Test Pre_aggregate with decimal column with order by") { + sql("drop table if exists maintable") + sql("create table maintable(name string, decimal_col decimal(30,16)) stored by 'carbondata'") + sql("insert into table maintable select 'abc',452.564") + sql( + "create datamap ag1 on table maintable using 'preaggregate' as select name,avg(decimal_col)" + + " from maintable group by name") + checkAnswer(sql("select avg(decimal_col) from maintable group by name order by name"), + Seq(Row(452.56400000000000000000))) + } + override def afterAll: Unit = { sql("DROP TABLE IF EXISTS mainTable") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/afe2b669/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index 76ff41a..9b204f8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -989,8 +989,15 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule Cast(AggregateExpression(Count(exp), aggExp.mode, false), DoubleType)) newExp case Average(exp: Expression) => - val newExp = Seq(AggregateExpression(Sum(Cast(exp, DoubleType)), aggExp.mode, false), - Cast(AggregateExpression(Count(exp), aggExp.mode, false), DoubleType)) + val dataType = + if (exp.dataType.isInstanceOf[DecimalType]) { + // decimal must not go as double precision. + exp.dataType.asInstanceOf[DecimalType] + } else { + DoubleType + } + val newExp = Seq(AggregateExpression(Sum(Cast(exp, dataType)), aggExp.mode, false), + Cast(AggregateExpression(Count(exp), aggExp.mode, false), dataType)) newExp case _ => val newExp = Seq(aggExp) @@ -1663,6 +1670,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule false)) } case Average(exp: Expression) => + val dataType = + if (exp.dataType.isInstanceOf[DecimalType]) { + // decimal must not go as double precision. + exp.dataType.asInstanceOf[DecimalType] + } else { + DoubleType + } // for handling Normal table case/Aggregate node added in case of streaming table if (!isStreamingTable) { // In case of average aggregate function select 2 columns from aggregate table @@ -1670,24 +1684,24 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule // Then add divide(sum(column with sum), sum(column with count)). Seq(Divide(AggregateExpression(Sum(Cast( attrs.head, - DoubleType)), + dataType)), aggExp.mode, false), AggregateExpression(Sum(Cast( attrs.last, - DoubleType)), + dataType)), aggExp.mode, false))) } else { // in case of streaming aggregate table return two aggregate function sum and count Seq(AggregateExpression(Sum(Cast( attrs.head, - DoubleType)), + dataType)), aggExp.mode, false), AggregateExpression(Sum(Cast( attrs.last, - DoubleType)), + dataType)), aggExp.mode, false)) }