Repository: carbondata
Updated Branches:
  refs/heads/master a26be1b18 -> 0c9a60e01


[CARBONDATA-2738]Block Preaggregate, Compaction, Dictionary Exclude/Include
for child columns for Complex datatype

Block Preaggregate, Compaction, Dictionary Exclude/Include for child column
s and Update Complex datatype columns for Complex datatype

This closes #2501


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0c9a60e0
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0c9a60e0
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0c9a60e0

Branch: refs/heads/master
Commit: 0c9a60e017211a27033eb770c416cf3ac303d9d4
Parents: a26be1b
Author: Indhumathi27 <indhumathi...@gmail.com>
Authored: Thu Jul 12 21:11:11 2018 +0530
Committer: kunal642 <kunalkapoor...@gmail.com>
Committed: Wed Jul 18 10:45:52 2018 +0530

----------------------------------------------------------------------
 .../complexType/TestComplexDataType.scala       | 176 ++++++++++++++++++-
 ...ataWithMalformedCarbonCommandException.scala |  12 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  14 +-
 .../sql/events/MergeIndexEventListener.scala    |   6 +-
 .../CarbonAlterTableCompactionCommand.scala     |   6 +
 .../CarbonProjectForUpdateCommand.scala         |  10 +-
 .../preaaggregate/PreAggregateUtil.scala        |  10 +-
 .../strategy/StreamingTableStrategy.scala       |   2 +-
 .../spark/sql/optimizer/CarbonIUDRule.scala     |   9 +-
 9 files changed, 224 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
index 45a9c7a..6470648 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
@@ -4,10 +4,11 @@ import java.sql.Timestamp
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 
@@ -18,6 +19,16 @@ import org.apache.carbondata.core.util.CarbonProperties
 
 class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
 
+  override def beforeAll(): Unit = {
+    sql("DROP TABLE IF EXISTS table1")
+    sql("DROP TABLE IF EXISTS test")
+  }
+
+  override def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS table1")
+    sql("DROP TABLE IF EXISTS test")
+  }
+
   test("test Projection PushDown for Struct - Integer type") {
     sql("DROP TABLE IF EXISTS table1")
     sql(
@@ -712,5 +723,166 @@ class TestComplexDataType extends QueryTest with 
BeforeAndAfterAll {
     checkAnswer(sql("select 
a.b,id,a.c,person.detail[0],d.e,d.f,person.detail[1],id from 
test"),Seq(Row(2,1,3,5,3,2,6,1)))
     checkAnswer(sql("select 
a.b,id,a.c,person.detail[0],d.e,d.f,person.detail[1],id,1,a.b from 
test"),Seq(Row(2,1,3,5,3,2,6,1,1,2)))
   }
-  
+
+  test("test block Update for complex datatype") {
+    sql("DROP TABLE IF EXISTS test")
+    sql("create table test(id int,a struct<b:int,c:int>,d array<int>) stored 
by 'carbondata'")
+    sql("insert into test values(1,'2$3',4)")
+    val structException = intercept[UnsupportedOperationException](
+    sql("update test set(a.b)=(4) where id=1").show(false))
+    assertResult("Unsupported operation on Complex data 
type")(structException.getMessage)
+    val arrayException = intercept[UnsupportedOperationException](
+    sql("update test set(a)=(4) where id=1").show(false))
+    assertResult("Unsupported operation on Complex data 
type")(arrayException.getMessage)
+  }
+
+  test("test block partition column") {
+    sql("DROP TABLE IF EXISTS test")
+    val arrayException = intercept[AnalysisException](
+    sql("""
+          | CREATE TABLE IF NOT EXISTS test
+          | (
+          | id Int,
+          | vin string,
+          | logdate Timestamp,
+          | phonenumber Long,
+          | country array<string>,
+          | salary Int
+          | )
+          | PARTITIONED BY (area array<string>)
+          | STORED BY 'carbondata'
+        """.stripMargin))
+    assertResult("Cannot use array<string> for partition 
column;")(arrayException.getMessage)
+    sql("DROP TABLE IF EXISTS test")
+    val structException = intercept[AnalysisException](
+      sql("""
+            | CREATE TABLE IF NOT EXISTS test
+            | (
+            | id Int,
+            | vin string,
+            | logdate Timestamp,
+            | phonenumber Long,
+            | country array<string>,
+            | salary Int
+            | )
+            | PARTITIONED BY (area struct<b:int>)
+            | STORED BY 'carbondata'
+          """.stripMargin)
+    )
+    assertResult("Cannot use struct<b:int> for partition 
column;")(structException.getMessage)
+  }
+
+  test("test block preaggregate") {
+    sql("DROP TABLE IF EXISTS test")
+    sql("create table test(id int,a struct<b:int>) stored by 'carbondata'")
+    sql("insert into test values (1,2)")
+    sql("insert into test values (1,2)")
+    sql("insert into test values (1,2)")
+    val structException = intercept[UnsupportedOperationException](
+      sql("create datamap preagg_sum on table test using 'preaggregate' as 
select id,sum(a.b) from test group by id"))
+    assertResult("Preaggregate is unsupported for ComplexData type column: 
a.b")(structException.getMessage)
+    sql("DROP TABLE IF EXISTS test")
+    sql("create table test(id int,a array<int>) stored by 'carbondata'")
+    sql("insert into test values (1,2)")
+    val arrayException = intercept[UnsupportedOperationException](
+      sql("create datamap preagg_sum on table test using 'preaggregate' as 
select id,sum(a[0]) from test group by id"))
+    assertResult("Preaggregate is unsupported for ComplexData type column: 
a[0]")(arrayException.getMessage)
+  }
+
+  test("test block dictionary exclude for child column") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,a 
struct<b:int,c:string,d:int,e:string,f:struct<g:int," +
+      "h:string,i:int>,j:int>) stored " +
+      "by " +
+      "'carbondata' tblproperties('dictionary_exclude'='a')")
+    sql("insert into table1 values(1,'1$abc$2$efg$3:mno:4$5')")
+    checkAnswer(sql("select a.b from table1"), Seq(Row(1)))
+    sql("DROP TABLE IF EXISTS table1")
+    val structException = intercept[MalformedCarbonCommandException](
+    sql(
+      "create table table1 (roll int,a 
struct<b:int,c:string,d:int,e:string,f:struct<g:int," +
+      "h:string,i:int>,j:int>) stored " +
+      "by " +
+      "'carbondata' tblproperties('dictionary_exclude'='a.b')"))
+    assertResult(
+      "DICTIONARY_EXCLUDE column: a.b does not exist in table or unsupported 
for complex child " +
+      "column. Please check create table statement.")(
+      structException.getMessage)
+    sql("DROP TABLE IF EXISTS table1")
+    val arrayException = intercept[MalformedCarbonCommandException](
+      sql(
+        "create table table1 (roll int,a array<int>) stored " +
+        "by " +
+        "'carbondata' tblproperties('dictionary_exclude'='a[0]')"))
+    assertResult(
+      "DICTIONARY_EXCLUDE column: a[0] does not exist in table or unsupported 
for complex child " +
+      "column. Please check create table statement.")(
+      arrayException.getMessage)
+  }
+
+  test("test block dictionary include for child column") {
+    sql("DROP TABLE IF EXISTS table1")
+    val structException = intercept[MalformedCarbonCommandException](
+      sql(
+        "create table table1 (roll int,a 
struct<b:int,c:string,d:int,e:string,f:struct<g:int," +
+        "h:string,i:int>,j:int>) stored " +
+        "by " +
+        "'carbondata' tblproperties('dictionary_include'='a.b')"))
+    assertResult(
+      "DICTIONARY_INCLUDE column: a.b does not exist in table or unsupported 
for complex child " +
+      "column. Please check create table statement.")(
+      structException.getMessage)
+    sql("DROP TABLE IF EXISTS table1")
+    val arrayException = intercept[MalformedCarbonCommandException](
+      sql(
+        "create table table1 (roll int,a array<int>) stored " +
+        "by " +
+        "'carbondata' tblproperties('dictionary_include'='a[0]')"))
+    assertResult(
+      "DICTIONARY_INCLUDE column: a[0] does not exist in table or unsupported 
for complex child " +
+      "column. Please check create table statement.")(
+      arrayException.getMessage)
+  }
+
+  test("test block compaction") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (roll int,person 
Struct<detail:int,age:string,height:double>) stored " +
+      "by 'carbondata'")
+    sql(
+      "load data inpath '" + resourcesPath +
+      "/Struct.csv' into table table1 options('delimiter'=','," +
+      
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
+      "'complex_delimiter_level_2'='&')")
+    sql(
+      "load data inpath '" + resourcesPath +
+      "/Struct.csv' into table table1 options('delimiter'=','," +
+      
"'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," +
+      "'complex_delimiter_level_2'='&')")
+    val exception = intercept[UnsupportedOperationException](
+      sql("alter table table1 compact 'major'"))
+    assertResult(
+      "Compaction is unsupported for Table containing Complex Columns")(
+      exception.getMessage)
+    val exception1 = intercept[UnsupportedOperationException](
+      sql("alter table table1 compact 'minor'"))
+    assertResult(
+      "Compaction is unsupported for Table containing Complex Columns")(
+      exception1.getMessage)
+    val exception2 = intercept[UnsupportedOperationException](
+      sql("alter table table1 compact 'custom' where segment.id in (0,1)"))
+    assertResult(
+      "Compaction is unsupported for Table containing Complex Columns")(
+      exception2.getMessage)
+  }
+
+  test("test complex datatype double for encoding") {
+    sql("DROP TABLE IF EXISTS table1")
+    sql(
+      "create table table1 (person struct<height:double>) stored by 
'carbondata'")
+    sql("insert into table1 values('1000000000')")
+    checkExistence(sql("select * from table1"),true,"1.0E9")
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala
index 6759049..5490f06 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala
@@ -83,16 +83,20 @@ class TestLoadDataWithMalformedCarbonCommandException 
extends QueryTest with Bef
     val e = intercept[MalformedCarbonCommandException] {
       buildTableWithNoExistDictExclude()
     }
-    assert(e.getMessage.equals("DICTIONARY_EXCLUDE column: ccc does not exist 
in table. " +
-      "Please check create table statement."))
+    assert(e.getMessage
+      .equals(
+        "DICTIONARY_EXCLUDE column: ccc does not exist in table or unsupported 
for complex child " +
+        "column. Please check create table statement."))
   }
 
   test("test load data with dictionary include columns which no exist in 
table.") {
     val e = intercept[MalformedCarbonCommandException] {
       buildTableWithNoExistDictInclude()
     }
-    assert(e.getMessage.equals("DICTIONARY_INCLUDE column: aaa does not exist 
in table. " +
-      "Please check create table statement."))
+    assert(e.getMessage
+      .equals(
+        "DICTIONARY_INCLUDE column: aaa does not exist in table or unsupported 
for complex child " +
+        "column. Please check create table statement."))
   }
 
   test("test load data with dictionary include is same with dictionary 
exclude") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 44adff3..97fe51a 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -722,16 +722,13 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
         .foreach { dictExcludeCol =>
           if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
             val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
-                           " does not exist in table. Please check create 
table statement."
+                           " does not exist in table or unsupported for 
complex child column. " +
+                           "Please check create table statement."
             throw new MalformedCarbonCommandException(errormsg)
           } else {
             val dataType = fields.find(x =>
               x.column.equalsIgnoreCase(dictExcludeCol)).get.dataType.get
-            if (isComplexDimDictionaryExclude(dataType)) {
-              val errormsg = "DICTIONARY_EXCLUDE is unsupported for complex 
datatype column: " +
-                             dictExcludeCol
-              throw new MalformedCarbonCommandException(errormsg)
-            } else if (!isDataTypeSupportedForDictionary_Exclude(dataType)) {
+            if (!isDataTypeSupportedForDictionary_Exclude(dataType)) {
               val errorMsg = "DICTIONARY_EXCLUDE is unsupported for " + 
dataType.toLowerCase() +
                              " data type column: " + dictExcludeCol
               throw new MalformedCarbonCommandException(errorMsg)
@@ -750,7 +747,8 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
       dictIncludeCols.foreach { distIncludeCol =>
         if (!fields.exists(x => 
x.column.equalsIgnoreCase(distIncludeCol.trim))) {
           val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
-                         " does not exist in table. Please check create table 
statement."
+                         " does not exist in table or unsupported for complex 
child column. " +
+                         "Please check create table statement."
           throw new MalformedCarbonCommandException(errormsg)
         }
         if (varcharCols.exists(x => x.equalsIgnoreCase(distIncludeCol.trim))) {
@@ -874,7 +872,7 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
    * detects whether datatype is part of dictionary_exclude
    */
   def isDataTypeSupportedForDictionary_Exclude(columnDataType: String): 
Boolean = {
-    val dataTypes = Array("string", "timestamp", "int", "long", "bigint")
+    val dataTypes = Array("string", "timestamp", "int", "long", "bigint", 
"struct", "array")
     dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index dd64423..dff3424 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -115,7 +115,8 @@ class MergeIndexEventListener extends 
OperationEventListener with Logging {
                 String]()
               loadFolderDetailsArray.foreach(loadMetadataDetails => {
                 segmentFileNameMap
-                  .put(loadMetadataDetails.getLoadName, 
loadMetadataDetails.getSegmentFile)
+                  .put(loadMetadataDetails.getLoadName,
+                    String.valueOf(loadMetadataDetails.getLoadStartTime))
               })
               CommonUtil.mergeIndexFiles(sparkSession.sparkContext,
                 validSegmentIds,
@@ -165,7 +166,8 @@ class MergeIndexEventListener extends 
OperationEventListener with Logging {
       .readLoadMetadata(carbonTable.getMetadataPath)
     val segmentFileNameMap: java.util.Map[String, String] = new 
util.HashMap[String, String]()
     loadFolderDetailsArray.foreach(loadMetadataDetails => {
-      segmentFileNameMap.put(loadMetadataDetails.getLoadName, 
loadMetadataDetails.getSegmentFile)
+      segmentFileNameMap
+        .put(loadMetadataDetails.getLoadName, 
String.valueOf(loadMetadataDetails.getLoadStartTime))
     })
     // filter out only the valid segments from the list of compacted segments
     // Example: say compacted segments list contains 0.1, 3.1, 6.1, 0.2.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index a4e52c3..a4adbbb 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -82,6 +82,12 @@ case class CarbonAlterTableCompactionCommand(
       throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
     }
 
+    if (table.getTableInfo.getFactTable.getListOfColumns.asScala
+      .exists(m => m.getDataType.isComplexType)) {
+      throw new UnsupportedOperationException(
+        "Compaction is unsupported for Table containing Complex Columns")
+    }
+
     if (CarbonUtil.hasAggregationDataMap(table) ||
         (table.isChildDataMap && null == 
operationContext.getProperty(table.getTableName))) {
       // If the compaction request is of 'streaming' type then we need to 
generate loadCommands

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 573ea9a..964407f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -42,7 +42,8 @@ import org.apache.carbondata.processing.loading.FailureCauses
 private[sql] case class CarbonProjectForUpdateCommand(
     plan: LogicalPlan,
     databaseNameOp: Option[String],
-    tableName: String)
+    tableName: String,
+    columns: List[String])
   extends DataCommand {
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
@@ -59,6 +60,13 @@ private[sql] case class CarbonProjectForUpdateCommand(
       return Seq.empty
     }
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
+    columns.foreach { col =>
+      val dataType = carbonTable.getColumnByName(tableName, 
col).getColumnSchema.getDataType
+      if (dataType.isComplexType) {
+        throw new UnsupportedOperationException("Unsupported operation on 
Complex data type")
+      }
+
+    }
     if (!carbonTable.getTableInfo.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 06ebc43..ecadf41 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -24,7 +24,7 @@ import 
org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAl
 import org.apache.spark.sql.CarbonExpressions.MatchCastExpression
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, 
UnresolvedFunction, UnresolvedRelation}
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
AttributeSeq, Cast, Expression, ExprId, NamedExpression, ScalaUDF}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, _}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command.{ColumnTableRelation, 
DataMapField, Field}
@@ -354,7 +354,13 @@ object PreAggregateUtil {
         !expression.isInstanceOf[AttributeReference]) {
       newColumnName
     } else {
-      expression.asInstanceOf[AttributeReference].name
+      if (expression.isInstanceOf[GetStructField] || 
expression.isInstanceOf[GetArrayItem]) {
+        throw new UnsupportedOperationException(
+          "Preaggregate is unsupported for ComplexData type column: " +
+          expression.simpleString.replaceAll("#[0-9]*", ""))
+      } else {
+        expression.asInstanceOf[AttributeReference].name
+      }
     }
     createField(columnName,
       dataType,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
index f4240e4..0849634 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -34,7 +34,7 @@ private[sql] class StreamingTableStrategy(sparkSession: 
SparkSession) extends Sp
 
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
     plan match {
-      case CarbonProjectForUpdateCommand(_, databaseNameOp, tableName) =>
+      case CarbonProjectForUpdateCommand(_, databaseNameOp, tableName, 
columns) =>
         rejectIfStreamingTable(
           TableIdentifier(tableName, databaseNameOp),
           "Data update")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
index 7300fe9..ae5825d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala
@@ -40,6 +40,13 @@ class CarbonIUDRule extends Rule[LogicalPlan] with 
PredicateHelper {
             val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = 
pList
               .splitAt(pList.size - cols.size)
             val diff = cols.diff(dest.map(_.name.toLowerCase))
+            cols.foreach { col =>
+              val complexExists = "\"name\":\"" + col + "\""
+              if (dest.exists(m => m.dataType.json.contains(complexExists))) {
+                throw new UnsupportedOperationException(
+                  "Unsupported operation on Complex data type")
+              }
+            }
             if (diff.nonEmpty) {
               sys.error(s"Unknown column(s) ${ diff.mkString(",") } in table 
${ table.tableName }")
             }
@@ -47,7 +54,7 @@ class CarbonIUDRule extends Rule[LogicalPlan] with 
PredicateHelper {
             Project(dest.filter(a => !cols.contains(a.name.toLowerCase)) ++ 
source, child)
         }
         CarbonProjectForUpdateCommand(
-          newPlan, table.tableIdentifier.database, table.tableIdentifier.table)
+          newPlan, table.tableIdentifier.database, 
table.tableIdentifier.table, cols)
     }
   }
 }

Reply via email to