Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178727711
--- Diff:
integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
---
@@ -0,0 +1,488 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain id copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.spark.testsuite.standardpartition
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv,
Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class StandardPartitionWithPreaggregateTestCase extends QueryTest with
BeforeAndAfterAll {
+
+ val testData = s"$resourcesPath/sample.csv"
+
+ override def beforeAll(): Unit = {
+ sql("drop database if exists partition_preaggregate cascade")
+ sql("create database partition_preaggregate")
+ sql("use partition_preaggregate")
+ sql(
+ """
+ | CREATE TABLE par(id INT, name STRING, age INT) PARTITIONED
BY(city STRING)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(
+ """
+ | CREATE TABLE maintable(id int, name string, city string)
partitioned by (age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ }
+
+ override def afterAll(): Unit = {
+ sql("drop database if exists partition_preaggregate cascade")
+ sql("use default")
+ }
+
+ // Create aggregate table on partition with partition column in
aggregation only.
+ test("test preaggregate table creation on partition table with partition
col as aggregation") {
+ sql("create datamap p1 on table par using 'preaggregate' as select id,
sum(city) from par group by id")
+ assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"),
"par_p1")(sqlContext.sparkSession).isHivePartitionTable)
+ }
+
+ // Create aggregate table on partition with partition column in
projection and aggregation only.
+ test("test preaggregate table creation on partition table with partition
col as projection") {
+ sql("create datamap p2 on table par using 'preaggregate' as select id,
city, min(city) from par group by id,city ")
+ assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"),
"par_p2")(sqlContext.sparkSession).isHivePartitionTable)
+ }
+
+ // Create aggregate table on partition with partition column as group by.
+ test("test preaggregate table creation on partition table with partition
col as group by") {
+ sql("create datamap p3 on table par using 'preaggregate' as select id,
max(city) from par group by id,city ")
+ assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"),
"par_p3")(sqlContext.sparkSession).isHivePartitionTable)
+ }
+
+ // Create aggregate table on partition without partition column.
+ test("test preaggregate table creation on partition table without
partition column") {
+ sql("create datamap p4 on table par using 'preaggregate' as select
name, count(id) from par group by name ")
+ assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"),
"par_p4")(sqlContext.sparkSession).isHivePartitionTable)
+ }
+
+ test("test data correction in aggregate table when partition column is
used") {
+ sql("create datamap p1 on table maintable using 'preaggregate' as
select id, sum(age) from maintable group by id, age")
+ checkAnswer(sql("select * from maintable_p1"),
+ Seq(Row(1,31,31),
+ Row(2,27,27),
+ Row(3,70,35),
+ Row(4,26,26),
+ Row(4,29,29)))
+ preAggTableValidator(sql("select id, sum(age) from maintable group by
id, age").queryExecution.analyzed, "maintable_p1")
+ sql("drop datamap p1 on table maintable")
+ }
+
+ test("test data correction in aggregate table when partition column is
not used") {
+ sql("create datamap p2 on table maintable using 'preaggregate' as
select id, max(age) from maintable group by id")
+ checkAnswer(sql("select * from maintable_p2"),
+ Seq(Row(1,31),
+ Row(2,27),
+ Row(3,35),
+ Row(4,29)))
+ preAggTableValidator(sql("select id, max(age) from maintable group by
id").queryExecution.analyzed, "maintable_p2")
+ sql("drop datamap p2 on table maintable")
+ }
+
+ test("test data correction with insert overwrite") {
+ sql("drop table if exists partitionone")
+ sql(
+ """
+ | CREATE TABLE if not exists partitionone (empname String)
+ | PARTITIONED BY (year int, month int,day int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql("create datamap p1 on table partitionone using 'preaggregate' as
select empname, sum(year) from partitionone group by empname, year, month,day")
+ sql("insert into partitionone values('k',2014,1,1)")
+ sql("insert overwrite table partitionone values('v',2014,1,1)")
+ checkAnswer(sql("select * from partitionone"), Seq(Row("v",2014,1,1)))
+ checkAnswer(sql("select * from partitionone_p1"),
Seq(Row("v",2014,2014,1,1)))
+ }
+
+ test("test data correction with insert overwrite on different value") {
+ sql("drop table if exists partitionone")
+ sql(
+ """
+ | CREATE TABLE if not exists partitionone (empname String)
+ | PARTITIONED BY (year int, month int,day int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql("create datamap p1 on table partitionone using 'preaggregate' as
select empname, sum(year) from partitionone group by empname, year, month,day")
+ sql("insert into partitionone values('k',2014,1,1)")
+ sql("insert overwrite table partitionone values('v',2015,1,1)")
+ checkAnswer(sql("select * from partitionone"), Seq(Row("k",2014,1,1),
Row("v",2015,1,1)))
--- End diff --
after insert overwrite operation only one row should come
(Row("v",2015,1,1))....Is my understanding correct?
---