This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 6ed72fb [CARBONDATA-3716] Fixed spark 2.4 UT failures 6ed72fb is described below commit 6ed72fb85d42865916095eee8502b9f1d6bc530c Author: akkio-97 <akshay.nuth...@gmail.com> AuthorDate: Mon Mar 2 12:32:32 2020 +0530 [CARBONDATA-3716] Fixed spark 2.4 UT failures Why is this PR needed? Issue: a) In 2.4 subquery alias produces an output set with alias identifier(that is both database_name and table_name) which causes failures. Solution- Get subquery alias output with only table_name as alias b) In 2.4 a new variable was introduced "spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation" which is by default set to "false". While refreshing table In 2.4 we need to set this variable to "true" in order to create table in non empty location. Does this PR introduce any user interface change? No Is any new testcase added? No This closes #3629 --- .../TestBroadCastSIFilterPushJoinWithUDF.scala | 229 +++++++++++---------- .../TestCTASWithSecondaryIndex.scala | 45 ++-- .../management/RefreshCarbonTableCommand.scala | 25 ++- .../apache/spark/sql/CarbonToSparkAdapter.scala | 7 +- .../apache/spark/sql/CarbonToSparkAdapter.scala | 9 +- .../testsuite/binary/TestBinaryDataType.scala | 19 +- .../org/apache/spark/util/SparkUtilTest.scala | 8 +- .../apache/carbondata/mv/extension/MVHelper.scala | 2 +- .../apache/carbondata/mv/extension/MVUtil.scala | 4 +- .../mv/rewrite/SummaryDatasetCatalog.scala | 15 +- .../mv/plans/modular/ModularPatterns.scala | 2 +- .../mv/plans/util/Logical2ModularExtractions.scala | 2 +- .../apache/carbondata/mv/plans/util/Printers.scala | 4 +- 13 files changed, 226 insertions(+), 145 deletions(-) diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestBroadCastSIFilterPushJoinWithUDF.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestBroadCastSIFilterPushJoinWithUDF.scala index 81de97c..c0693ec 100644 --- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestBroadCastSIFilterPushJoinWithUDF.scala +++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestBroadCastSIFilterPushJoinWithUDF.scala @@ -18,6 +18,7 @@ package org.apache.carbondata.spark.testsuite.secondaryindex import org.apache.spark.sql.DataFrame import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.util.SparkUtil import org.scalatest.BeforeAndAfterAll /** @@ -251,122 +252,142 @@ class TestBroadCastSIFilterPushJoinWithUDF extends QueryTest with BeforeAndAfter test("test all the above udfs") { // all the above udf - carbonQuery = sql( - "select approx_count_distinct(empname), approx_count_distinct(deptname), collect_list" + - "(empname), collect_set(deptname), corr(deptno, empno), covar_pop(deptno, empno), " + - "covar_samp(deptno, empno), grouping(designation), grouping(deptname), mean(deptno), mean" + - "(empno),skewness(deptno), skewness(empno), stddev(deptno), stddev(empno), stddev_pop" + - "(deptno), stddev_pop(empno), stddev_samp(deptno), stddev_samp(empno), var_pop(deptno), " + - "var_pop(empno), var_samp(deptno), var_samp(empno), variance(deptno), variance(empno), " + - "COALESCE(CONV(substring(empname, 3, 2), 16, 10), ''), COALESCE(CONV(substring(deptname, 3," + - " 2), 16, 10), '') from udfValidation where empname = 'pramod' or deptname = 'network' or " + - "designation='TL' group by designation, deptname, empname with ROLLUP") - hiveQuery = sql( - "select approx_count_distinct(empname), approx_count_distinct(deptname), collect_list" + - "(empname), collect_set(deptname), corr(deptno, empno), covar_pop(deptno, empno), " + - "covar_samp(deptno, empno), grouping(designation), grouping(deptname), mean(deptno), mean" + - "(empno),skewness(deptno), skewness(empno), stddev(deptno), stddev(empno), stddev_pop" + - "(deptno), stddev_pop(empno), stddev_samp(deptno), stddev_samp(empno), var_pop(deptno), " + - "var_pop(empno), var_samp(deptno), var_samp(empno), variance(deptno), variance(empno), " + - "COALESCE(CONV(substring(empname, 3, 2), 16, 10), ''), COALESCE(CONV(substring(deptname, 3," + - " 2), 16, 10), '') from udfHive where empname = 'pramod' or deptname = 'network' or " + - "designation='TL' group by designation, deptname, empname with ROLLUP") - if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) { - assert(true) - } else { - assert(false) - } - checkAnswer(carbonQuery, hiveQuery) + // TO DO, need to remove this check, once JIRA for spark 2.4 has been resolved (SPARK-30974) + if(SparkUtil.isSparkVersionEqualTo("2.3")) + { + carbonQuery = sql( + "select approx_count_distinct(empname), approx_count_distinct(deptname), collect_list" + + "(empname), collect_set(deptname), corr(deptno, empno), covar_pop(deptno, empno), " + + "covar_samp(deptno, empno), grouping(designation), grouping(deptname), mean(deptno), mean" + + "(empno),skewness(deptno), skewness(empno), stddev(deptno), stddev(empno), stddev_pop" + + "(deptno), stddev_pop(empno), stddev_samp(deptno), stddev_samp(empno), var_pop(deptno), " + + "var_pop(empno), var_samp(deptno), var_samp(empno), variance(deptno), variance(empno), " + + "COALESCE(CONV(substring(empname, 3, 2), 16, 10), ''), COALESCE(CONV(substring(deptname, 3," + + " 2), 16, 10), '') from udfValidation where empname = 'pramod' or deptname = 'network' or " + + "designation='TL' group by designation, deptname, empname with ROLLUP") + hiveQuery = sql( + "select approx_count_distinct(empname), approx_count_distinct(deptname), collect_list" + + "(empname), collect_set(deptname), corr(deptno, empno), covar_pop(deptno, empno), " + + "covar_samp(deptno, empno), grouping(designation), grouping(deptname), mean(deptno), mean" + + "(empno),skewness(deptno), skewness(empno), stddev(deptno), stddev(empno), stddev_pop" + + "(deptno), stddev_pop(empno), stddev_samp(deptno), stddev_samp(empno), var_pop(deptno), " + + "var_pop(empno), var_samp(deptno), var_samp(empno), variance(deptno), variance(empno), " + + "COALESCE(CONV(substring(empname, 3, 2), 16, 10), ''), COALESCE(CONV(substring(deptname, 3," + + " 2), 16, 10), '') from udfHive where empname = 'pramod' or deptname = 'network' or " + + "designation='TL' group by designation, deptname, empname with ROLLUP") + if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) { + assert(true) + } else { + assert(false) + } + checkAnswer(carbonQuery, hiveQuery) + } + } test("test alias of all the above udf") { // alias all the above udf - carbonQuery = sql( - "select approx_count_distinct(empname) as c1, approx_count_distinct(deptname) as c2, collect_list" + - "(empname) as c3, collect_set(deptname) as c4, corr(deptno, empno) as c5, covar_pop(deptno, empno) as c6, " + - "covar_samp(deptno, empno) as c7, grouping(designation) as c8, grouping(deptname) as c9, mean(deptno) as c10, mean" + - "(empno) as c11,skewness(deptno) as c12, skewness(empno) as c13, stddev(deptno) as c14, stddev(empno) as c15, stddev_pop" + - "(deptno) as c16, stddev_pop(empno) as c17, stddev_samp(deptno) as c18, stddev_samp(empno) as c18, var_pop(deptno) as c19, " + - "var_pop(empno) as c20, var_samp(deptno) as c21, var_samp(empno) as c22, variance(deptno) as c23, variance(empno) as c24, " + - "COALESCE(CONV(substring(empname, 3, 2), 16, 10), '') as c25, COALESCE(CONV(substring(deptname, 3," + - " 2), 16, 10), '') as c26 from udfValidation where empname = 'pramod' or deptname = 'network' or " + - "designation='TL' group by designation, deptname, empname with ROLLUP") - hiveQuery = sql( - "select approx_count_distinct(empname) as c1, approx_count_distinct(deptname) as c2, collect_list" + - "(empname) as c3, collect_set(deptname) as c4, corr(deptno, empno) as c5, covar_pop(deptno, empno) as c6, " + - "covar_samp(deptno, empno) as c7, grouping(designation) as c8, grouping(deptname) as c9, mean(deptno) as c10, mean" + - "(empno) as c11,skewness(deptno) as c12, skewness(empno) as c13, stddev(deptno) as c14, stddev(empno) as c15, stddev_pop" + - "(deptno) as c16, stddev_pop(empno) as c17, stddev_samp(deptno) as c18, stddev_samp(empno) as c18, var_pop(deptno) as c19, " + - "var_pop(empno) as c20, var_samp(deptno) as c21, var_samp(empno) as c22, variance(deptno) as c23, variance(empno) as c24, " + - "COALESCE(CONV(substring(empname, 3, 2), 16, 10), '') as c25, COALESCE(CONV(substring(deptname, 3," + - " 2), 16, 10), '') as c26 from udfHive where empname = 'pramod' or deptname = 'network' or " + - "designation='TL' group by designation, deptname, empname with ROLLUP") - if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) { - assert(true) - } else { - assert(false) - } - checkAnswer(carbonQuery, hiveQuery) + // TO DO, need to remove this check, once JIRA for spark 2.4 has been resolved (SPARK-30974) + if(SparkUtil.isSparkVersionEqualTo("2.3")) + { + carbonQuery = sql( + "select approx_count_distinct(empname) as c1, approx_count_distinct(deptname) as c2, collect_list" + + "(empname) as c3, collect_set(deptname) as c4, corr(deptno, empno) as c5, covar_pop(deptno, empno) as c6, " + + "covar_samp(deptno, empno) as c7, grouping(designation) as c8, grouping(deptname) as c9, mean(deptno) as c10, mean" + + "(empno) as c11,skewness(deptno) as c12, skewness(empno) as c13, stddev(deptno) as c14, stddev(empno) as c15, stddev_pop" + + "(deptno) as c16, stddev_pop(empno) as c17, stddev_samp(deptno) as c18, stddev_samp(empno) as c18, var_pop(deptno) as c19, " + + "var_pop(empno) as c20, var_samp(deptno) as c21, var_samp(empno) as c22, variance(deptno) as c23, variance(empno) as c24, " + + "COALESCE(CONV(substring(empname, 3, 2), 16, 10), '') as c25, COALESCE(CONV(substring(deptname, 3," + + " 2), 16, 10), '') as c26 from udfValidation where empname = 'pramod' or deptname = 'network' or " + + "designation='TL' group by designation, deptname, empname with ROLLUP") + hiveQuery = sql( + "select approx_count_distinct(empname) as c1, approx_count_distinct(deptname) as c2, collect_list" + + "(empname) as c3, collect_set(deptname) as c4, corr(deptno, empno) as c5, covar_pop(deptno, empno) as c6, " + + "covar_samp(deptno, empno) as c7, grouping(designation) as c8, grouping(deptname) as c9, mean(deptno) as c10, mean" + + "(empno) as c11,skewness(deptno) as c12, skewness(empno) as c13, stddev(deptno) as c14, stddev(empno) as c15, stddev_pop" + + "(deptno) as c16, stddev_pop(empno) as c17, stddev_samp(deptno) as c18, stddev_samp(empno) as c18, var_pop(deptno) as c19, " + + "var_pop(empno) as c20, var_samp(deptno) as c21, var_samp(empno) as c22, variance(deptno) as c23, variance(empno) as c24, " + + "COALESCE(CONV(substring(empname, 3, 2), 16, 10), '') as c25, COALESCE(CONV(substring(deptname, 3," + + " 2), 16, 10), '') as c26 from udfHive where empname = 'pramod' or deptname = 'network' or " + + "designation='TL' group by designation, deptname, empname with ROLLUP") + if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) { + assert(true) + } else { + assert(false) + } + checkAnswer(carbonQuery, hiveQuery) + } + } test("test cast of all the above udf") { // cast all the above udf - carbonQuery = sql( - "select cast(approx_count_distinct(empname) as string), cast(approx_count_distinct(deptname) as string), collect_list" + - "(empname), collect_set(deptname), cast(corr(deptno, empno) as string), cast(covar_pop(deptno, empno) as string), " + - "cast(covar_samp(deptno, empno) as string), cast(grouping(designation) as string), cast(grouping(deptname) as string), cast(mean(deptno) as string), cast(mean" + - "(empno) as string),cast(skewness(deptno) as string), cast(skewness(empno) as string), cast(stddev(deptno) as string), cast(stddev(empno) as string), cast(stddev_pop" + - "(deptno) as string), cast(stddev_pop(empno) as string), cast(stddev_samp(deptno) as string), cast(stddev_samp(empno) as string), cast(var_pop(deptno) as string), " + - "cast(var_pop(empno) as string), cast(var_samp(deptno) as string), cast(var_samp(empno) as string), cast(variance(deptno) as string), cast(variance(empno) as string), " + - "COALESCE(CONV(substring(empname, 3, 2), 16, 10), ''), COALESCE(CONV(substring(deptname, 3," + - " 2), 16, 10), '') from udfValidation where empname = 'pramod' or deptname = 'network' or " + - "designation='TL' group by designation, deptname, empname with ROLLUP") - hiveQuery = sql( - "select cast(approx_count_distinct(empname) as string), cast(approx_count_distinct(deptname) as string), collect_list" + - "(empname), collect_set(deptname), cast(corr(deptno, empno) as string), cast(covar_pop(deptno, empno) as string), " + - "cast(covar_samp(deptno, empno) as string), cast(grouping(designation) as string), cast(grouping(deptname) as string), cast(mean(deptno) as string), cast(mean" + - "(empno) as string),cast(skewness(deptno) as string), cast(skewness(empno) as string), cast(stddev(deptno) as string), cast(stddev(empno) as string), cast(stddev_pop" + - "(deptno) as string), cast(stddev_pop(empno) as string), cast(stddev_samp(deptno) as string), cast(stddev_samp(empno) as string), cast(var_pop(deptno) as string), " + - "cast(var_pop(empno) as string), cast(var_samp(deptno) as string), cast(var_samp(empno) as string), cast(variance(deptno) as string), cast(variance(empno) as string), " + - "COALESCE(CONV(substring(empname, 3, 2), 16, 10), ''), COALESCE(CONV(substring(deptname, 3," + - " 2), 16, 10), '') from udfHive where empname = 'pramod' or deptname = 'network' or " + - "designation='TL' group by designation, deptname, empname with ROLLUP") - if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) { - assert(true) - } else { - assert(false) - } - checkAnswer(carbonQuery, hiveQuery) + // TO DO, need to remove this check, once JIRA for spark 2.4 has been resolved (SPARK-30974) + if(SparkUtil.isSparkVersionEqualTo("2.3")) + { + carbonQuery = sql( + "select cast(approx_count_distinct(empname) as string), cast(approx_count_distinct(deptname) as string), collect_list" + + "(empname), collect_set(deptname), cast(corr(deptno, empno) as string), cast(covar_pop(deptno, empno) as string), " + + "cast(covar_samp(deptno, empno) as string), cast(grouping(designation) as string), cast(grouping(deptname) as string), cast(mean(deptno) as string), cast(mean" + + "(empno) as string),cast(skewness(deptno) as string), cast(skewness(empno) as string), cast(stddev(deptno) as string), cast(stddev(empno) as string), cast(stddev_pop" + + "(deptno) as string), cast(stddev_pop(empno) as string), cast(stddev_samp(deptno) as string), cast(stddev_samp(empno) as string), cast(var_pop(deptno) as string), " + + "cast(var_pop(empno) as string), cast(var_samp(deptno) as string), cast(var_samp(empno) as string), cast(variance(deptno) as string), cast(variance(empno) as string), " + + "COALESCE(CONV(substring(empname, 3, 2), 16, 10), ''), COALESCE(CONV(substring(deptname, 3," + + " 2), 16, 10), '') from udfValidation where empname = 'pramod' or deptname = 'network' or " + + "designation='TL' group by designation, deptname, empname with ROLLUP") + hiveQuery = sql( + "select cast(approx_count_distinct(empname) as string), cast(approx_count_distinct(deptname) as string), collect_list" + + "(empname), collect_set(deptname), cast(corr(deptno, empno) as string), cast(covar_pop(deptno, empno) as string), " + + "cast(covar_samp(deptno, empno) as string), cast(grouping(designation) as string), cast(grouping(deptname) as string), cast(mean(deptno) as string), cast(mean" + + "(empno) as string),cast(skewness(deptno) as string), cast(skewness(empno) as string), cast(stddev(deptno) as string), cast(stddev(empno) as string), cast(stddev_pop" + + "(deptno) as string), cast(stddev_pop(empno) as string), cast(stddev_samp(deptno) as string), cast(stddev_samp(empno) as string), cast(var_pop(deptno) as string), " + + "cast(var_pop(empno) as string), cast(var_samp(deptno) as string), cast(var_samp(empno) as string), cast(variance(deptno) as string), cast(variance(empno) as string), " + + "COALESCE(CONV(substring(empname, 3, 2), 16, 10), ''), COALESCE(CONV(substring(deptname, 3," + + " 2), 16, 10), '') from udfHive where empname = 'pramod' or deptname = 'network' or " + + "designation='TL' group by designation, deptname, empname with ROLLUP") + if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) { + assert(true) + } else { + assert(false) + } + checkAnswer(carbonQuery, hiveQuery) + } + } test("test cast and alias with all the above udf") { // cast and alias with all the above udf - carbonQuery = sql( - "select cast(approx_count_distinct(empname) as string) as c1, cast(approx_count_distinct(deptname) as string) as c2, collect_list" + - "(empname) as c3, collect_set(deptname) as c4, cast(corr(deptno, empno) as string) as c5, cast(covar_pop(deptno, empno) as string) as c6, " + - "cast(covar_samp(deptno, empno) as string) as c7, cast(grouping(designation) as string) as c8, cast(grouping(deptname) as string) as c9, cast(mean(deptno) as string) as c10, cast(mean" + - "(empno) as string) as c11,cast(skewness(deptno) as string) as c12, cast(skewness(empno) as string) as c13, cast(stddev(deptno) as string) as c14, cast(stddev(empno) as string) as c15, cast(stddev_pop" + - "(deptno) as string) as c16, cast(stddev_pop(empno) as string) as c17, cast(stddev_samp(deptno) as string) as c18, cast(stddev_samp(empno) as string) as c19, cast(var_pop(deptno) as string) as c20, " + - "cast(var_pop(empno) as string) as c21, cast(var_samp(deptno) as string) as c22, cast(var_samp(empno) as string) as c23, cast(variance(deptno) as string) as c24, cast(variance(empno) as string) as c25, " + - "COALESCE(CONV(substring(empname, 3, 2), 16, 10), '') as c26, COALESCE(CONV(substring(deptname, 3," + - " 2), 16, 10), '') as c27 from udfValidation where empname = 'pramod' or deptname = 'network' or " + - "designation='TL' group by designation, deptname, empname with ROLLUP") - hiveQuery = sql( - "select cast(approx_count_distinct(empname) as string) as c1, cast(approx_count_distinct(deptname) as string) as c2, collect_list" + - "(empname) as c3, collect_set(deptname) as c4, cast(corr(deptno, empno) as string) as c5, cast(covar_pop(deptno, empno) as string) as c6, " + - "cast(covar_samp(deptno, empno) as string) as c7, cast(grouping(designation) as string) as c8, cast(grouping(deptname) as string) as c9, cast(mean(deptno) as string) as c10, cast(mean" + - "(empno) as string) as c11,cast(skewness(deptno) as string) as c12, cast(skewness(empno) as string) as c13, cast(stddev(deptno) as string) as c14, cast(stddev(empno) as string) as c15, cast(stddev_pop" + - "(deptno) as string) as c16, cast(stddev_pop(empno) as string) as c17, cast(stddev_samp(deptno) as string) as c18, cast(stddev_samp(empno) as string) as c19, cast(var_pop(deptno) as string) as c20, " + - "cast(var_pop(empno) as string) as c21, cast(var_samp(deptno) as string) as c22, cast(var_samp(empno) as string) as c23, cast(variance(deptno) as string) as c24, cast(variance(empno) as string) as c25, " + - "COALESCE(CONV(substring(empname, 3, 2), 16, 10), '') as c26, COALESCE(CONV(substring(deptname, 3," + - " 2), 16, 10), '') as c27 from udfHive where empname = 'pramod' or deptname = 'network' or " + - "designation='TL' group by designation, deptname, empname with ROLLUP") - if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) { - assert(true) - } else { - assert(false) - } - checkAnswer(carbonQuery, hiveQuery) + // TO DO, need to remove this check, once JIRA for spark 2.4 has been resolved (SPARK-30974) + if(SparkUtil.isSparkVersionEqualTo("2.3")) + { + carbonQuery = sql( + "select cast(approx_count_distinct(empname) as string) as c1, cast(approx_count_distinct(deptname) as string) as c2, collect_list" + + "(empname) as c3, collect_set(deptname) as c4, cast(corr(deptno, empno) as string) as c5, cast(covar_pop(deptno, empno) as string) as c6, " + + "cast(covar_samp(deptno, empno) as string) as c7, cast(grouping(designation) as string) as c8, cast(grouping(deptname) as string) as c9, cast(mean(deptno) as string) as c10, cast(mean" + + "(empno) as string) as c11,cast(skewness(deptno) as string) as c12, cast(skewness(empno) as string) as c13, cast(stddev(deptno) as string) as c14, cast(stddev(empno) as string) as c15, cast(stddev_pop" + + "(deptno) as string) as c16, cast(stddev_pop(empno) as string) as c17, cast(stddev_samp(deptno) as string) as c18, cast(stddev_samp(empno) as string) as c19, cast(var_pop(deptno) as string) as c20, " + + "cast(var_pop(empno) as string) as c21, cast(var_samp(deptno) as string) as c22, cast(var_samp(empno) as string) as c23, cast(variance(deptno) as string) as c24, cast(variance(empno) as string) as c25, " + + "COALESCE(CONV(substring(empname, 3, 2), 16, 10), '') as c26, COALESCE(CONV(substring(deptname, 3," + + " 2), 16, 10), '') as c27 from udfValidation where empname = 'pramod' or deptname = 'network' or " + + "designation='TL' group by designation, deptname, empname with ROLLUP") + hiveQuery = sql( + "select cast(approx_count_distinct(empname) as string) as c1, cast(approx_count_distinct(deptname) as string) as c2, collect_list" + + "(empname) as c3, collect_set(deptname) as c4, cast(corr(deptno, empno) as string) as c5, cast(covar_pop(deptno, empno) as string) as c6, " + + "cast(covar_samp(deptno, empno) as string) as c7, cast(grouping(designation) as string) as c8, cast(grouping(deptname) as string) as c9, cast(mean(deptno) as string) as c10, cast(mean" + + "(empno) as string) as c11,cast(skewness(deptno) as string) as c12, cast(skewness(empno) as string) as c13, cast(stddev(deptno) as string) as c14, cast(stddev(empno) as string) as c15, cast(stddev_pop" + + "(deptno) as string) as c16, cast(stddev_pop(empno) as string) as c17, cast(stddev_samp(deptno) as string) as c18, cast(stddev_samp(empno) as string) as c19, cast(var_pop(deptno) as string) as c20, " + + "cast(var_pop(empno) as string) as c21, cast(var_samp(deptno) as string) as c22, cast(var_samp(empno) as string) as c23, cast(variance(deptno) as string) as c24, cast(variance(empno) as string) as c25, " + + "COALESCE(CONV(substring(empname, 3, 2), 16, 10), '') as c26, COALESCE(CONV(substring(deptname, 3," + + " 2), 16, 10), '') as c27 from udfHive where empname = 'pramod' or deptname = 'network' or " + + "designation='TL' group by designation, deptname, empname with ROLLUP") + if (testSecondaryIndexForORFilterPushDown.isFilterPushedDownToSI(carbonQuery.queryExecution.executedPlan)) { + assert(true) + } else { + assert(false) + } + checkAnswer(carbonQuery, hiveQuery) + } + } test("test udf on filter - concat") { diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCTASWithSecondaryIndex.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCTASWithSecondaryIndex.scala index 2dadee3..8d0da96 100644 --- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCTASWithSecondaryIndex.scala +++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCTASWithSecondaryIndex.scala @@ -19,6 +19,7 @@ package org.apache.carbondata.spark.testsuite.secondaryindex import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.util.SparkUtil import org.scalatest.BeforeAndAfterAll /** @@ -201,20 +202,36 @@ class TestCTASWithSecondaryIndex extends QueryTest with BeforeAndAfterAll{ } test("test ctas with carbon table with SI having cast of UDF functions") { - sql("drop table if exists carbon_table1") - val query = "select cast(approx_count_distinct(empname) as string) as c1, cast(approx_count_distinct(deptname) as string) as c2,cast(corr(deptno, empno) as string) as c5, cast(covar_pop(deptno, empno) as string) as c6, " + - "cast(covar_samp(deptno, empno) as string) as c7, cast(grouping(designation) as string) as c8, cast(grouping(deptname) as string) as c9, cast(mean(deptno) as string) as c10, cast(mean" + - "(empno) as string) as c11,cast(skewness(deptno) as string) as c12, cast(skewness(empno) as string) as c13, cast(stddev(deptno) as string) as c14, cast(stddev(empno) as string) as c15, cast(stddev_pop" + - "(deptno) as string) as c16, cast(stddev_pop(empno) as string) as c17, cast(stddev_samp(deptno) as string) as c18, cast(stddev_samp(empno) as string) as c19, cast(var_pop(deptno) as string) as c20, " + - "cast(var_pop(empno) as string) as c21, cast(var_samp(deptno) as string) as c22, cast(var_samp(empno) as string) as c23, cast(variance(deptno) as string) as c24, cast(variance(empno) as string) as c25, " + - "COALESCE(CONV(substring(empname, 3, 2), 16, 10), '') as c26, COALESCE(CONV(substring(deptname, 3," + - " 2), 16, 10), '') as c27 from udfValidation where empname = 'pramod' or deptname = 'network' or " + - "designation='TL' group by designation, deptname, empname with ROLLUP" - val df = sql(s"explain extended $query").collect() - df(0).getString(0).contains("default.ind_i1 ") - sql(s"create table carbon_table1 stored as carbondata as $query") - checkAnswer(sql("select count(*) from carbon_table1"), Seq(Row(15))) - sql("drop table if exists carbon_table1") + if(SparkUtil.isSparkVersionEqualTo("2.3")) { + sql("drop table if exists carbon_table1") + val query = + "select cast(approx_count_distinct(empname) as string) as c1, cast(approx_count_distinct" + + "(deptname) as string) as c2,cast(corr(deptno, empno) as string) as c5, cast(covar_pop" + + "(deptno, empno) as string) as c6, " + + "cast(covar_samp(deptno, empno) as string) as c7, cast(grouping(designation) as string) " + + "as c8, cast(grouping(deptname) as string) as c9, cast(mean(deptno) as string) as c10, " + + "cast(mean" + + "(empno) as string) as c11,cast(skewness(deptno) as string) as c12, cast(skewness(empno) " + + "as string) as c13, cast(stddev(deptno) as string) as c14, cast(stddev(empno) as string) " + + "as c15, cast(stddev_pop" + + "(deptno) as string) as c16, cast(stddev_pop(empno) as string) as c17, cast(stddev_samp" + + "(deptno) as string) as c18, cast(stddev_samp(empno) as string) as c19, cast(var_pop" + + "(deptno) as string) as c20, " + + "cast(var_pop(empno) as string) as c21, cast(var_samp(deptno) as string) as c22, cast" + + "(var_samp(empno) as string) as c23, cast(variance(deptno) as string) as c24, cast" + + "(variance(empno) as string) as c25, " + + "COALESCE(CONV(substring(empname, 3, 2), 16, 10), '') as c26, COALESCE(CONV(substring" + + "(deptname, 3," + + " 2), 16, 10), '') as c27 from udfValidation where empname = 'pramod' or deptname = " + + "'network' or " + + "designation='TL' group by designation, deptname, empname with ROLLUP" + val df = sql(s"explain extended $query").collect() + df(0).getString(0).contains("default.ind_i1 ") + sql(s"create table carbon_table1 stored as carbondata as $query") + checkAnswer(sql("select count(*) from carbon_table1"), Seq(Row(15))) + sql("drop table if exists carbon_table1") + } + } test("test ctas with carbon table with SI having concat function") { diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala index a55b477..8cf3e7b 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, MetadataCommand} import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand import org.apache.spark.sql.execution.datasources.RefreshTable +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.SparkUtil import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -35,7 +37,7 @@ import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore} import org.apache.carbondata.core.metadata.schema.SchemaReader import org.apache.carbondata.core.metadata.schema.partition.PartitionType -import org.apache.carbondata.core.metadata.schema.table. TableInfo +import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.path.CarbonTablePath @@ -154,18 +156,35 @@ case class RefreshCarbonTableCommand( tableInfo: TableInfo, tablePath: String)(sparkSession: SparkSession): Any = { val operationContext = new OperationContext + var allowCreateTableNonEmptyLocation: String = null + val allowCreateTableNonEmptyLocationConf = + "spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation" try { val refreshTablePreExecutionEvent: RefreshTablePreExecutionEvent = new RefreshTablePreExecutionEvent(sparkSession, tableInfo.getOrCreateAbsoluteTableIdentifier()) + if (SparkUtil.isSparkVersionXandAbove("2.4")) { + // During refresh table, when this option is set to true, creating managed tables with + // nonempty location is allowed. Otherwise, an analysis exception is thrown. + // https://kb.databricks.com/jobs/spark-overwrite-cancel.html + allowCreateTableNonEmptyLocation = sparkSession.sessionState + .conf.getConfString(allowCreateTableNonEmptyLocationConf) + sparkSession.sessionState.conf.setConfString(allowCreateTableNonEmptyLocationConf, "true") + } OperationListenerBus.getInstance.fireEvent(refreshTablePreExecutionEvent, operationContext) CarbonCreateTableCommand(tableInfo, ifNotExistsSet = false, tableLocation = Some(tablePath)) .run(sparkSession) } catch { case e: AnalysisException => throw e - case e: Exception => - throw e + case e: Exception => throw e + } finally { + if (SparkUtil.isSparkVersionXandAbove("2.4")) { + // Set it back to default + sparkSession.sessionState.conf + .setConfString(allowCreateTableNonEmptyLocationConf, allowCreateTableNonEmptyLocation) + } } + val refreshTablePostExecutionEvent: RefreshTablePostExecutionEvent = new RefreshTablePostExecutionEvent(sparkSession, tableInfo.getOrCreateAbsoluteTableIdentifier()) diff --git a/integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala index b084235..7b5589d 100644 --- a/integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala +++ b/integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala @@ -25,8 +25,9 @@ import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexRepl import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, Expression, ExpressionSet, ExprId, NamedExpression, ScalaUDF, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, ScalaUDF, SubqueryExpression} import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.hive.HiveExternalCatalog @@ -62,6 +63,10 @@ object CarbonToSparkAdapter { qualifier = Some(newSubsume)) } + def getOutput(subQueryAlias: SubqueryAlias): Seq[Attribute] = { + subQueryAlias.output + } + def createScalaUDF(s: ScalaUDF, reference: AttributeReference): ScalaUDF = { ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes) } diff --git a/integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala index 76ee6d7..7bd1056 100644 --- a/integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala +++ b/integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala @@ -23,11 +23,11 @@ import org.apache.spark.SparkContext import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener, SessionCatalog} -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, Expression, ExpressionSet, ExprId, NamedExpression, ScalaUDF, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, ScalaUDF, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.hive.{CarbonMVRules, HiveExternalCatalog} @@ -161,6 +161,11 @@ object CarbonToSparkAdapter { storageFormat.copy(properties = map, locationUri = Some(new URI(tablePath))) } + def getOutput(subQueryAlias: SubqueryAlias): Seq[Attribute] = { + val newAlias = Seq(subQueryAlias.name.identifier) + subQueryAlias.child.output.map(_.withQualifier(newAlias)) + } + def getHiveExternalCatalog(sparkSession: SparkSession) = sparkSession.sessionState.catalog.externalCatalog .asInstanceOf[ExternalCatalogWithListener] diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala index 905552c..f80316d 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala @@ -26,6 +26,7 @@ import org.apache.commons.codec.binary.{Base64, Hex} import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.util.SparkUtil import org.scalatest.BeforeAndAfterAll /** @@ -1608,8 +1609,6 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll { | from uniqdata """.stripMargin).show() } - assert(e1.getMessage.contains("cannot resolve 'avg(substring(uniqdata.`CUST_NAME`, 1, 2))' due to data type mismatch: function average requires numeric types, not BinaryType")) - val e2 = intercept[Exception] { sql( s""" @@ -1624,7 +1623,6 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll { | where CUST_ID IS NULL or DOB IS NOT NULL or BIGINT_COLUMN1 =1233720368578 or DECIMAL_COLUMN1 = 12345678901.1234000058 or Double_COLUMN1 = 1.12345674897976E10 or INTEGER_COLUMN1 IS NULL limit 10 """.stripMargin) } - assert(e2.getMessage.contains("cannot resolve 'avg(substring(uniqdata.`CUST_NAME`, 1, 2))' due to data type mismatch: function average requires numeric types, not BinaryType")) val e3 = intercept[Exception] { sql( @@ -1640,8 +1638,19 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll { | where CUST_ID IS NULL or DOB IS NOT NULL or BIGINT_COLUMN1 =1233720368578 or DECIMAL_COLUMN1 = 12345678901.1234000058 or Double_COLUMN1 = 1.12345674897976E10 or INTEGER_COLUMN1 IS NULL limit 10 """.stripMargin) } - assert(e3.getMessage.contains("cannot resolve 'avg(substring(uniqdata.`CUST_NAME`, 1, 2))' due to data type mismatch: function average requires numeric types, not BinaryType")) - + // Exceptions are specific to spark versions + val message_2_3 = "cannot resolve 'avg(substring(uniqdata.`CUST_NAME`, 1, 2))' due to data type mismatch: function average requires numeric types, not BinaryType" + val message_2_4 = "cannot resolve 'avg(substring(default.uniqdata.`CUST_NAME`, 1, 2))' due to data type mismatch: function average requires numeric types, not binary" + if(SparkUtil.isSparkVersionEqualTo("2.3")) { + assert(e1.getMessage.contains(message_2_3)) + assert(e2.getMessage.contains(message_2_3)) + assert(e3.getMessage.contains(message_2_3)) + } + else if (SparkUtil.isSparkVersionXandAbove("2.4")) { + assert(e1.getMessage.contains(message_2_4)) + assert(e2.getMessage.contains(message_2_4)) + assert(e3.getMessage.contains(message_2_4)) + } } test("test binary insert with int value") { diff --git a/integration/spark/src/test/scala/org/apache/spark/util/SparkUtilTest.scala b/integration/spark/src/test/scala/org/apache/spark/util/SparkUtilTest.scala index 4810db1..e02681a 100644 --- a/integration/spark/src/test/scala/org/apache/spark/util/SparkUtilTest.scala +++ b/integration/spark/src/test/scala/org/apache/spark/util/SparkUtilTest.scala @@ -34,8 +34,8 @@ class SparkUtilTest extends FunSuite{ } else { assert(SparkUtil.isSparkVersionXandAbove("2.1")) assert(SparkUtil.isSparkVersionXandAbove("2.2")) - assert(SparkUtil.isSparkVersionXandAbove("2.3")) - assert(!SparkUtil.isSparkVersionXandAbove("2.4")) + assert(SparkUtil.isSparkVersionXandAbove("2.3") || + SparkUtil.isSparkVersionXandAbove("2.4")) } } @@ -51,8 +51,8 @@ class SparkUtilTest extends FunSuite{ } else { assert(!SparkUtil.isSparkVersionEqualTo("2.1")) assert(!SparkUtil.isSparkVersionEqualTo("2.2")) - assert(SparkUtil.isSparkVersionEqualTo("2.3")) - assert(!SparkUtil.isSparkVersionEqualTo("2.4")) + assert(SparkUtil.isSparkVersionEqualTo("2.3") || + SparkUtil.isSparkVersionXandAbove("2.4")) } } } \ No newline at end of file diff --git a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala index b23f350..57518ea 100644 --- a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala +++ b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala @@ -367,7 +367,7 @@ object MVHelper { private def updateColumnName(attr: Attribute, counter: Int): String = { val name = getUpdatedName(attr.name, counter) val value = attr.qualifier.map(qualifier => qualifier + "_" + name) - if (value.nonEmpty) value.head else name + if (value.nonEmpty) value.last else name } // Return all relations involved in the plan diff --git a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala index c127262..47c7f23 100644 --- a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala +++ b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala @@ -128,14 +128,14 @@ class MVUtil { qualifier = if (attr.qualifier.headOption.get.startsWith("gen_sub")) { Some(catalogTable.identifier.table) } else { - attr.qualifier.headOption + attr.qualifier.lastOption } } fieldToDataMapFieldMap += getFieldToDataMapFields( attr.name, attr.dataType, - qualifier.headOption, + qualifier.lastOption, "", arrayBuffer, catalogTable.identifier.table) diff --git a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala index f4e64fa..56d0324 100644 --- a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala +++ b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala @@ -19,10 +19,10 @@ package org.apache.carbondata.mv.rewrite import java.util.concurrent.locks.ReentrantReadWriteLock -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{CarbonToSparkAdapter, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.datasources.FindDataSourceTable import org.apache.carbondata.core.datamap.DataMapCatalog @@ -119,10 +119,15 @@ private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession) mvSession.sessionState.optimizer.execute(planToRegister)).next().semiHarmonized val signature = modularPlan.signature val identifier = dataMapSchema.getRelationIdentifier - val output = new FindDataSourceTable(sparkSession) + val plan = new FindDataSourceTable(sparkSession) .apply(sparkSession.sessionState.catalog - .lookupRelation(TableIdentifier(identifier.getTableName, Some(identifier.getDatabaseName)))) - .output + .lookupRelation(TableIdentifier(identifier.getTableName, + Some(identifier.getDatabaseName)))) + val output = if (plan.isInstanceOf[SubqueryAlias]) { + CarbonToSparkAdapter.getOutput(plan.asInstanceOf[SubqueryAlias]) + } else { + plan.output + } val relation = ModularRelation(identifier.getDatabaseName, identifier.getTableName, output, diff --git a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala index b694e78..5fbfb9b 100644 --- a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala +++ b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala @@ -55,7 +55,7 @@ object SimpleModularizer extends ModularPatterns { val makeupmap: Map[Int, String] = children.zipWithIndex.flatMap { case (child, i) => aq.find(child.outputSet.contains(_)) - .flatMap(_.qualifier.headOption) + .flatMap(_.qualifier.lastOption) .map((i, _)) }.toMap g.copy(child = s.copy(aliasMap = makeupmap ++ aliasmap)) diff --git a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala index d102916..56c5a50 100644 --- a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala +++ b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala @@ -167,7 +167,7 @@ object ExtractSelectModule extends PredicateHelper { children.zipWithIndex.flatMap { case (child, i) => aq.find(child.outputSet.contains(_)) - .flatMap(_.qualifier.headOption) + .flatMap(_.qualifier.lastOption) .map((i, _)) }.toMap } diff --git a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala index af8d184..ff50579 100644 --- a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala +++ b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala @@ -204,7 +204,7 @@ trait Printers { s.child match { case a: Alias => val qualifierPrefix = a.qualifier - .map(_ + ".").headOption.getOrElse("") + .map(_ + ".").lastOption.getOrElse("") s"$qualifierPrefix${ quoteIdentifier(a .name) @@ -221,7 +221,7 @@ trait Printers { s.child match { case a: Alias => val qualifierPrefix = a.qualifier.map(_ + ".") - .headOption.getOrElse("") + .lastOption.getOrElse("") s"$qualifierPrefix${ quoteIdentifier(a.name) }" case other => other.sql