Repository: carbondata Updated Branches: refs/heads/master f962e41b7 -> 8320918e5
[CARBONDATA-2950]alter add column of hive table fails from carbon for spark versions above 2.1 Problem: spark does not support add columns in spark-2.1, but it is supported in 2.2 and above when add column is fired for hive table in carbon session, for spark -version above 2.1, it throws error as unsupported operation on hive table Solution: when alter add columns for hive is fired for spark-2.2 and above, it should not throw any exception and it should pass This closes #2735 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8320918e Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8320918e Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8320918e Branch: refs/heads/master Commit: 8320918e55b393fedc946e4543843a72712d9199 Parents: f962e41 Author: akashrn5 <[email protected]> Authored: Wed Sep 19 19:51:39 2018 +0530 Committer: kumarvishal09 <[email protected]> Committed: Fri Sep 21 21:55:06 2018 +0530 ---------------------------------------------------------------------- .../sdv/generated/AlterTableTestCase.scala | 18 ++++++++++++- .../lucene/LuceneFineGrainDataMapSuite.scala | 27 -------------------- .../org/apache/carbondata/spark/util/Util.java | 2 +- .../spark/util/CarbonReflectionUtils.scala | 15 +++++++++++ .../sql/execution/strategy/DDLStrategy.scala | 21 +++++++++++++-- 5 files changed, 52 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/8320918e/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala index 4e53ea3..90fa602 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala @@ -18,12 +18,14 @@ package org.apache.carbondata.cluster.sdv.generated +import org.apache.spark.SPARK_VERSION import org.apache.spark.sql.Row import org.apache.spark.sql.common.util._ -import org.apache.spark.sql.test.TestQueryExecutor +import org.apache.spark.util.SparkUtil import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.constants.LoggerAction +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties @@ -1000,6 +1002,20 @@ class AlterTableTestCase extends QueryTest with BeforeAndAfterAll { sql(s"""drop table if exists uniqdata59""").collect } + test("Alter table add column for hive table for spark version above 2.1") { + sql("drop table if exists alter_hive") + sql("create table alter_hive(name string)") + if(SPARK_VERSION.startsWith("2.1")) { + val exception = intercept[MalformedCarbonCommandException] { + sql("alter table alter_hive add columns(add string)") + } + assert(exception.getMessage.contains("Unsupported alter operation on hive table")) + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + sql("alter table alter_hive add columns(add string)") + sql("insert into alter_hive select 'abc','banglore'") + } + } + val prop = CarbonProperties.getInstance() val p1 = prop.getProperty("carbon.horizontal.compaction.enable", CarbonCommonConstants.defaultIsHorizontalCompactionEnabled) val p2 = prop.getProperty("carbon.horizontal.update.compaction.threshold", CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION) http://git-wip-us.apache.org/repos/asf/carbondata/blob/8320918e/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala index 0c6134b..2e3019a 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala @@ -415,33 +415,6 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS datamap_test_table") } - test("test lucene fine grain data map with ALTER ADD and DROP Table COLUMN on Lucene DataMap") { - sql("DROP TABLE IF EXISTS datamap_test_table") - sql( - """ - | CREATE TABLE datamap_test_table(id INT, name STRING, city STRING, age INT) - | STORED BY 'carbondata' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT') - """.stripMargin) - sql( - s""" - | CREATE DATAMAP dm2 ON TABLE datamap_test_table - | USING 'lucene' - | DMProperties('INDEX_COLUMNS'='name , city') - """.stripMargin) - val exception_add_column: Exception = intercept[MalformedCarbonCommandException] { - sql("alter table dm2 add columns(city1 string)") - } - assert(exception_add_column.getMessage - .contains("Unsupported alter operation on hive table")) - val exception_drop_column: Exception = intercept[MalformedCarbonCommandException] { - sql("alter table dm2 drop columns(name)") - } - assert(exception_drop_column.getMessage - .contains("Unsupported alter operation on hive table")) - sql("drop datamap if exists dm2 on table datamap_test_table") - } - test("test Clean Files and check Lucene DataMap") { sql("DROP TABLE IF EXISTS datamap_test_table") sql( http://git-wip-us.apache.org/repos/asf/carbondata/blob/8320918e/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java index 832a1b2..d1193f5 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java @@ -59,7 +59,7 @@ public class Util { return false; } - private static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType( + public static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType( DataType carbonDataType) { if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) { return DataTypes.StringType; http://git-wip-us.apache.org/repos/asf/carbondata/blob/8320918e/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala index 32cd201..9955286 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala @@ -33,8 +33,10 @@ import org.apache.spark.sql.catalyst.parser.AstBuilder import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation} import org.apache.spark.sql.sources.{BaseRelation, Filter} +import org.apache.spark.sql.types.StructField import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -301,6 +303,19 @@ object CarbonReflectionUtils { } } + /** + * method to invoke alter table add columns for hive table from carbon session + * @param table + * @param colsToAdd + * @return + */ + def invokeAlterTableAddColumn(table: TableIdentifier, + colsToAdd: Seq[StructField]): Object = { + val caseClassName = "org.apache.spark.sql.execution.command.AlterTableAddColumnsCommand" + CarbonReflectionUtils.createObject(caseClassName, table, colsToAdd) + ._1.asInstanceOf[RunnableCommand] + } + def createObject(className: String, conArgs: Object*): (Any, Class[_]) = { val clazz = Utils.classForName(className) val ctor = clazz.getConstructors.head http://git-wip-us.apache.org/repos/asf/carbondata/blob/8320918e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 4499b19..f9046f0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -32,12 +32,14 @@ import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTa import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable} import org.apache.spark.sql.hive.{CarbonRelation, CreateCarbonSourceTableAsSelectCommand} import org.apache.spark.sql.parser.CarbonSpark2SqlParser -import org.apache.spark.util.{CarbonReflectionUtils, FileUtils} +import org.apache.spark.sql.types.StructField +import org.apache.spark.util.{CarbonReflectionUtils, FileUtils, SparkUtil} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo} +import org.apache.carbondata.spark.util.Util /** * Carbon strategies for ddl commands @@ -152,6 +154,21 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { } else { ExecutedCommandExec(addColumn) :: Nil } + // TODO: remove this else if check once the 2.1 version is unsupported by carbon + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { + val structField = (alterTableAddColumnsModel.dimCols ++ alterTableAddColumnsModel.msrCols) + .map { + a => + StructField(a.column, + Util.convertCarbonToSparkDataType(DataTypeUtil.valueOf(a.dataType.get))) + } + val identifier = TableIdentifier( + alterTableAddColumnsModel.tableName, + alterTableAddColumnsModel.databaseName) + ExecutedCommandExec(CarbonReflectionUtils + .invokeAlterTableAddColumn(identifier, structField).asInstanceOf[RunnableCommand]) :: + Nil + // TODO: remove this else check once the 2.1 version is unsupported by carbon } else { throw new MalformedCarbonCommandException("Unsupported alter operation on hive table") }
