[CARBONDATA-2057] Support specifying path when creating pre-aggregate table
When creating datamap of pre-aggreagate table, user should be able to specify the persistence location of it. User can use path property: This closes #1835 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9882a74c Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9882a74c Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9882a74c Branch: refs/heads/carbonstore Commit: 9882a74c8843085cc13756a03025f163829e194e Parents: aac7af7 Author: Jacky Li <[email protected]> Authored: Fri Jan 19 14:48:36 2018 +0800 Committer: ravipesala <[email protected]> Committed: Fri Jan 19 15:49:01 2018 +0530 ---------------------------------------------------------------------- .../testsuite/datamap/TestDataMapCommand.scala | 27 +++++++++++++ .../datamap/CarbonDropDataMapCommand.scala | 40 ++++++++------------ .../CreatePreAggregateTableCommand.scala | 5 ++- 3 files changed, 47 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/9882a74c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala index d0a342b..f3458e2 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala @@ -17,6 +17,8 @@ package org.apache.carbondata.spark.testsuite.datamap +import java.io.{File, FilenameFilter} + import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll @@ -24,6 +26,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { @@ -207,6 +210,30 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55))) } + test("create pre-agg table with path") { + sql("drop table if exists main_preagg") + sql("drop table if exists main ") + val path = "./_pre-agg_test" + sql("create table main(year int,month int,name string,salary int) stored by 'carbondata' tblproperties('sort_columns'='month,year,name')") + sql("insert into main select 10,11,'amy',12") + sql("insert into main select 10,11,'amy',14") + sql("create datamap preagg on table main " + + "using 'preaggregate' " + + s"dmproperties ('path'='$path') " + + "as select name,avg(salary) from main group by name") + assertResult(true)(new File(path).exists()) + assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}") + .list(new FilenameFilter { + override def accept(dir: File, name: String): Boolean = { + name.contains(CarbonCommonConstants.FACT_FILE_EXT) + } + }).length > 0) + checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0)) + checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2)) + sql("drop datamap preagg on table main") + assertResult(false)(new File(path).exists()) + sql("drop table main") + } override def afterAll { sql("DROP TABLE IF EXISTS maintable") http://git-wip-us.apache.org/repos/asf/carbondata/blob/9882a74c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index 59aa322..e545b0b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -51,10 +51,11 @@ case class CarbonDropDataMapCommand( tableName: String) extends AtomicRunnableCommand { + var commandToRun: CarbonDropTableCommand = _ + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) - val identifier = TableIdentifier(tableName, Option(dbName)) val locksToBeAcquired = List(LockUsage.METADATA_LOCK) val carbonEnv = CarbonEnv.getInstance(sparkSession) val catalog = carbonEnv.carbonMetastore @@ -68,18 +69,12 @@ case class CarbonDropDataMapCommand( lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock) } LOGGER.audit(s"Deleting datamap [$dataMapName] under table [$tableName]") - var carbonTable: Option[CarbonTable] = - catalog.getTableFromMetadataCache(dbName, tableName) - if (carbonTable.isEmpty) { - try { - carbonTable = Some(catalog.lookupRelation(identifier)(sparkSession) - .asInstanceOf[CarbonRelation].metaData.carbonTable) - } catch { - case ex: NoSuchTableException => - if (!ifExistsSet) { - throw ex - } - } + val carbonTable: Option[CarbonTable] = try { + Some(CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)) + } catch { + case ex: NoSuchTableException => + if (!ifExistsSet) throw ex + else None } if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) { val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex. @@ -104,12 +99,12 @@ case class CarbonDropDataMapCommand( tableName))(sparkSession) if (dataMapSchema.isDefined) { if (dataMapSchema.get._1.getRelationIdentifier != null) { - CarbonDropTableCommand( + commandToRun = CarbonDropTableCommand( ifExistsSet = true, Some(dataMapSchema.get._1.getRelationIdentifier.getDatabaseName), dataMapSchema.get._1.getRelationIdentifier.getTableName, - dropChildTable = true - ).processMetadata(sparkSession) + dropChildTable = true) + commandToRun.processMetadata(sparkSession) } } // fires the event after dropping datamap from main table schema @@ -143,14 +138,11 @@ case class CarbonDropDataMapCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { // delete the table folder - val tableIdentifier = CarbonEnv.getIdentifier(databaseNameOp, tableName)(sparkSession) - DataMapStoreManager.getInstance().clearDataMap(tableIdentifier, dataMapName) - CarbonDropTableCommand( - ifExistsSet = true, - databaseNameOp, - dataMapName, - dropChildTable = true - ).processData(sparkSession) + if (commandToRun != null) { + DataMapStoreManager.getInstance().clearDataMap( + commandToRun.carbonTable.getAbsoluteTableIdentifier, dataMapName) + commandToRun.processData(sparkSession) + } Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9882a74c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index 8b11548..933bf91 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -105,8 +105,11 @@ case class CreatePreAggregateTableCommand( } tableModel.parentTable = Some(parentTable) tableModel.dataMapRelation = Some(fieldRelationMap) - val tablePath = + val tablePath = if (dmProperties.contains("path")) { + dmProperties("path") + } else { CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession) + } CarbonCreateTableCommand(TableNewProcessor(tableModel), tableModel.ifNotExistsSet, Some(tablePath)).run(sparkSession)
