This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new bd54ce8  [CARBONDATA-3628] Support alter hive table add array and map 
type column
bd54ce8 is described below

commit bd54ce83d819d8350b1a594c3007ac747d5485dc
Author: IceMimosa <chk19940...@gmail.com>
AuthorDate: Tue Dec 24 11:07:30 2019 +0800

    [CARBONDATA-3628] Support alter hive table add array and map type column
    
    Support adding array and map data type column by ALTER TABLE
    
    This closes #3529
---
 .../cluster/sdv/generated/AlterTableTestCase.scala | 26 +++++++++++++++++++++-
 .../cluster/sdv/generated/SDKwriterTestCase.scala  |  3 +--
 .../apache/spark/sql/common/util/QueryTest.scala   | 10 ++++-----
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     | 11 ++++-----
 .../spark/sql/execution/strategy/DDLStrategy.scala |  7 +++---
 5 files changed, 39 insertions(+), 18 deletions(-)

diff --git 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
index 297ff04..cc34df5 100644
--- 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
+++ 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
@@ -1022,7 +1022,31 @@ class AlterTableTestCase extends QueryTest with 
BeforeAndAfterAll {
       assert(exception.getMessage.contains("Unsupported alter operation on 
hive table"))
     } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       sql("alter table alter_hive add columns(add string)")
-      sql("insert into alter_hive select 'abc','banglore'")
+      sql("alter table alter_hive add columns (var map<string, string>)")
+      sql("insert into alter_hive select 
'abc','banglore',map('age','10','birth','2020')")
+      checkAnswer(
+        sql("select * from alter_hive"),
+        Seq(Row("abc", "banglore", Map("age" -> "10", "birth" -> "2020")))
+      )
+    }
+  }
+
+  test("Alter table add column for hive partitioned table for spark version 
above 2.1") {
+    sql("drop table if exists alter_hive")
+    sql("create table alter_hive(name string) stored as rcfile partitioned by 
(dt string)")
+    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      sql("alter table alter_hive add columns(add string)")
+      sql("alter table alter_hive add columns (var map<string, string>)")
+      sql("alter table alter_hive add columns (loves array<string>)")
+      sql(
+        s"""
+           |insert into alter_hive partition(dt='par')
+           |select 'abc', 'banglore', map('age', '10', 'birth', '2020'), 
array('a', 'b', 'c')
+         """.stripMargin)
+      checkAnswer(
+        sql("select * from alter_hive where dt='par'"),
+        Seq(Row("abc", "banglore", Map("age" -> "10", "birth" -> "2020"), 
Seq("a", "b", "c"), "par"))
+      )
     }
   }
 
diff --git 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
index d6a9413..82541b2 100644
--- 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
+++ 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
@@ -146,8 +146,7 @@ class SDKwriterTestCase extends QueryTest with 
BeforeAndAfterEach {
   }
 
   def deleteFile(path: String, extension: String): Unit = {
-    val file: CarbonFile = FileFactory
-      .getCarbonFile(path, FileFactory.getFileType(path))
+    val file: CarbonFile = FileFactory.getCarbonFile(path)
 
     for (eachDir <- file.listFiles) {
       if (!eachDir.isDirectory) {
diff --git 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 9d4fe79..eca20ed 100644
--- 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -88,14 +88,13 @@ class QueryTest extends PlanTest with Suite {
 
   protected def checkAnswer(carbon: String, hive: String, uniqueIdentifier: 
String): Unit = {
     val path = TestQueryExecutor.hiveresultpath + "/" + uniqueIdentifier
-    if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
-      val objinp = new ObjectInputStream(FileFactory
-        .getDataInputStream(path, FileFactory.getFileType(path)))
+    if (FileFactory.isFileExist(path)) {
+      val objinp = new ObjectInputStream(FileFactory.getDataInputStream(path))
       val rows = objinp.readObject().asInstanceOf[Array[Row]]
       objinp.close()
       QueryTest.checkAnswer(sql(carbon), rows) match {
         case Some(errorMessage) => {
-          FileFactory.deleteFile(path, FileFactory.getFileType(path))
+          FileFactory.deleteFile(path)
           writeAndCheckAnswer(carbon, hive, path)
         }
         case None =>
@@ -107,8 +106,7 @@ class QueryTest extends PlanTest with Suite {
 
   private def writeAndCheckAnswer(carbon: String, hive: String, path: String): 
Unit = {
     val rows = sql(hive).collect()
-    val obj = new ObjectOutputStream(FileFactory.getDataOutputStream(path, 
FileFactory
-      .getFileType(path)))
+    val obj = new ObjectOutputStream(FileFactory.getDataOutputStream(path))
     obj.writeObject(rows)
     obj.close()
     checkAnswer(sql(carbon), rows)
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 0f31471..71e91cc 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.spark.rdd
 
 import java.io.IOException
 import java.util
-import java.util.{Collections, List, Map}
+import java.util.{Collections, List}
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable
@@ -32,10 +32,11 @@ import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.SparkSession
+import 
org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo}
 import org.apache.spark.sql.hive.DistributionUtil
-import org.apache.spark.sql.util.{CarbonException, SparkTypeConverter}
+import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
@@ -64,8 +65,8 @@ import 
org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger._
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
CarbonLoaderUtil}
 import org.apache.carbondata.spark.MergeResult
-import org.apache.carbondata.spark.load.{ByteArrayOrdering, 
DataLoadProcessBuilderOnSpark, PrimtiveOrdering, StringOrdering}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
+import org.apache.carbondata.spark.load.{DataLoadProcessBuilderOnSpark, 
PrimtiveOrdering, StringOrdering}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
 
 class CarbonMergerRDD[K, V](
     @transient private val ss: SparkSession,
@@ -680,7 +681,7 @@ class CarbonMergerRDD[K, V](
       partitionNames = null,
       splits = allSplits)
     val objectOrdering: Ordering[Object] = createOrderingForColumn(rangeColumn)
-    val sparkDataType = Util.convertCarbonToSparkDataType(dataType)
+    val sparkDataType = 
CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(dataType)
     // Change string type to support all types
     val sampleRdd = scanRdd
       .map(row => (row.get(0, sparkDataType), null))
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 7c8993e..a851bc3 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.execution.strategy
 
 import org.apache.spark.sql._
+import 
org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, 
UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
@@ -35,11 +36,9 @@ import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.util.{CarbonReflectionUtils, DataMapUtil, FileUtils, 
SparkUtil}
 
-import org.apache.carbondata.common.exceptions.DeprecatedFeatureException
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
-import org.apache.carbondata.spark.util.Util
 
   /**
    * Carbon strategies for ddl commands
@@ -176,8 +175,8 @@ class DDLStrategy(sparkSession: SparkSession) extends 
SparkStrategy {
           val structField = (alterTableAddColumnsModel.dimCols ++ 
alterTableAddColumnsModel.msrCols)
             .map {
               a =>
-                StructField(a.column,
-                  
Util.convertCarbonToSparkDataType(DataTypeUtil.valueOf(a.dataType.get)))
+                StructField(a.column, 
CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(
+                  DataTypeUtil.valueOf(a.dataType.get)))
             }
           val identifier = TableIdentifier(
             alterTableAddColumnsModel.tableName,

Reply via email to