Repository: carbondata Updated Branches: refs/heads/master 29bae4d28 -> c08fe933c
[CARBONDATA-1881] Insert overwrite value for pre-aggregate load was incorrect While loading the value for insert overwrite was set to false. Consider the value of insert overwrite set for maintable for pre-aggregate table loading. This closes #1639 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c08fe933 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c08fe933 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c08fe933 Branch: refs/heads/master Commit: c08fe933c1479b05b41c3ef9c3b342c8de674526 Parents: 29bae4d Author: kunal642 <[email protected]> Authored: Mon Dec 11 11:20:43 2017 +0530 Committer: kumarvishal <[email protected]> Committed: Wed Dec 13 14:13:41 2017 +0530 ---------------------------------------------------------------------- .../preaggregate/TestPreAggregateLoad.scala | 16 ++++++++++++++++ .../carbondata/spark/rdd/CarbonDataRDDFactory.scala | 1 + .../CreatePreAggregateTableCommand.scala | 1 + .../preaaggregate/PreAggregateListeners.scala | 3 +++ .../command/preaaggregate/PreAggregateUtil.scala | 3 ++- 5 files changed, 23 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/c08fe933/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala index 569439c..6a5f221 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala @@ -187,4 +187,20 @@ class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("select * from maintable_preagg_sum"), Row(1, 52)) } + test("test if pre-aagregate is overwritten if main table is inserted with insert overwrite") { + sql("DROP TABLE IF EXISTS maintable") + sql( + """ + | CREATE TABLE maintable(id int, name string, city string, age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql( + s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id""" + .stripMargin) + sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)") + sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)") + sql(s"insert overwrite table maintable values(1, 'xyz', 'delhi', 29)") + checkAnswer(sql("select * from maintable_preagg_sum"), Row(1, 29)) + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c08fe933/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 8f4af1b..7955e71 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -503,6 +503,7 @@ object CarbonDataRDDFactory { sqlContext.sparkSession, carbonTable.getCarbonTableIdentifier, carbonLoadModel) + operationContext.setProperty("isOverwrite", overwriteTable) OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext) val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable) if (!done) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/c08fe933/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index 9a84450..8c02f3b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -143,6 +143,7 @@ case class CreatePreAggregateTableCommand( queryString, segmentToLoad = "*", validateSegments = true, + isOverwrite = false, sparkSession = sparkSession) } Seq.empty http://git-wip-us.apache.org/repos/asf/carbondata/blob/c08fe933/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index 747e447..1e5b305 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -70,12 +70,15 @@ object LoadPostAggregateListener extends OperationEventListener { databasename) } } + val isOverwrite = + operationContext.getProperty("isOverwrite").asInstanceOf[Boolean] PreAggregateUtil.startDataLoadForDataMap( table, TableIdentifier(childTableName, Some(childDatabaseName)), childSelectQuery, carbonLoadModel.getSegmentId, validateSegments = false, + isOverwrite, sparkSession) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c08fe933/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index 5ad5308..81ccbd2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -504,6 +504,7 @@ object PreAggregateUtil { queryString: String, segmentToLoad: String, validateSegments: Boolean, + isOverwrite: Boolean, sparkSession: SparkSession): Unit = { CarbonSession.threadSet( CarbonCommonConstants.CARBON_INPUT_SEGMENTS + @@ -525,7 +526,7 @@ object PreAggregateUtil { null, Nil, Map("fileheader" -> headers), - isOverwriteTable = false, + isOverwriteTable = isOverwrite, dataFrame = Some(dataFrame), internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")). run(sparkSession)
