Repository: carbondata Updated Branches: refs/heads/master 0bbfa8597 -> 0e8707a60
[CARBONDATA-1737] [CARBONDATA-1760] [PreAgg] Fixed partial load issue if user has set segments to access on parent table Analysis: Partial load was happening on pre-aggregate table when the user has set segments to access for the parent table. Solution: Set segments to access to * before firing load for child table. This closes #1613 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0e8707a6 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0e8707a6 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0e8707a6 Branch: refs/heads/master Commit: 0e8707a60cdef2604dc78c61e53bd349a7e90d28 Parents: 0bbfa85 Author: kunal642 <[email protected]> Authored: Tue Dec 5 17:01:46 2017 +0530 Committer: ravipesala <[email protected]> Committed: Wed Dec 6 21:51:23 2017 +0530 ---------------------------------------------------------------------- .../preaggregate/TestPreAggregateLoad.scala | 17 +++++++ .../carbondata/spark/rdd/CarbonScanRDD.scala | 22 ++++++--- .../CreatePreAggregateTableCommand.scala | 40 +++++++-------- .../preaaggregate/PreAggregateListeners.scala | 41 +++------------ .../preaaggregate/PreAggregateUtil.scala | 52 +++++++++++++++++++- 5 files changed, 106 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e8707a6/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala index 1502c53..569439c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala @@ -170,4 +170,21 @@ class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll { }.getMessage.equalsIgnoreCase("Cannot insert/load data directly into pre-aggregate table")) } + test("test whether all segments are loaded into pre-aggregate table if segments are set on main table") { + sql("DROP TABLE IF EXISTS maintable") + sql( + """ + | CREATE TABLE maintable(id int, name string, city string, age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)") + sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)") + sql("set carbon.input.segments.default.maintable=0") + sql( + s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id""" + .stripMargin) + sql("reset") + checkAnswer(sql("select * from maintable_preagg_sum"), Row(1, 52)) + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e8707a6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 67d75bd..1fa838c 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -359,14 +359,20 @@ class CarbonScanRDD( } val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo if (carbonSessionInfo != null) { - CarbonTableInputFormat.setAggeragateTableSegments(conf, carbonSessionInfo.getSessionParams - .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + - identifier.getCarbonTableIdentifier.getDatabaseName + "." + - identifier.getCarbonTableIdentifier.getTableName, "")) - CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams - .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + - identifier.getCarbonTableIdentifier.getDatabaseName + "." + - identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean) + val segmentsToScan = carbonSessionInfo.getSessionParams.getProperty( + CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + identifier.getCarbonTableIdentifier.getDatabaseName + "." + + identifier.getCarbonTableIdentifier.getTableName) + if (segmentsToScan != null) { + CarbonTableInputFormat.setAggeragateTableSegments(conf, segmentsToScan) + } + val validateSegments = carbonSessionInfo.getSessionParams.getProperty( + CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + identifier.getCarbonTableIdentifier.getDatabaseName + "." + + identifier.getCarbonTableIdentifier.getTableName) + if (validateSegments != null) { + CarbonTableInputFormat.setValidateSegmentsToAccess(conf, validateSegments.toBoolean) + } } format } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e8707a6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index 6cee0e8..1ebf511 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -23,13 +23,13 @@ import scala.collection.mutable import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand -import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand} +import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.path.CarbonTablePath /** * Below command class will be used to create pre-aggregate table @@ -69,7 +69,8 @@ case class CreatePreAggregateTableCommand( None) val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan) - assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table)) + assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table), + "Parent table name is different in select and create") // updating the relation identifier, this will be stored in child table // which can be used during dropping of pre-aggreate table as parent table will // also get updated @@ -102,14 +103,11 @@ case class CreatePreAggregateTableCommand( override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = { // drop child table and undo the change in table info of main table - CarbonDropTableCommand( + CarbonDropDataMapCommand( + dataMapName, ifExistsSet = true, - tableIdentifier.database, - tableIdentifier.table - ).run(sparkSession) - - // TODO: undo the change in table info of main table - + parentTableIdentifier.database, + parentTableIdentifier.table).run(sparkSession) Seq.empty } @@ -124,19 +122,15 @@ case class CreatePreAggregateTableCommand( val loadAvailable = SegmentStatusManager.readLoadMetadata(parentCarbonTable.getMetaDataFilepath) .nonEmpty if (loadAvailable) { - val headers = parentCarbonTable.getTableInfo.getFactTable.getListOfColumns. - asScala.map(_.getColumnName).mkString(",") - val childDataFrame = sparkSession.sql( - new CarbonSpark2SqlParser().addPreAggLoadFunction(queryString)) - CarbonLoadDataCommand(tableIdentifier.database, - tableIdentifier.table, - null, - Nil, - Map("fileheader" -> headers), - isOverwriteTable = false, - dataFrame = Some(childDataFrame), - internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")). - run(sparkSession) + // Passing segmentToLoad as * because we want to load all the segments into the + // pre-aggregate table even if the user has set some segments on the parent table. + PreAggregateUtil.startDataLoadForDataMap( + parentCarbonTable, + tableIdentifier, + queryString, + segmentToLoad = "*", + validateSegments = true, + sparkSession = sparkSession) } Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e8707a6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index d314488..90b728d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.CarbonSession +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -40,41 +41,15 @@ object LoadPostAggregateListener extends OperationEventListener { val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable if (table.hasDataMapSchema) { for (dataMapSchema: DataMapSchema <- table.getTableInfo.getDataMapSchemaList.asScala) { - CarbonSession.threadSet( - CarbonCommonConstants.CARBON_INPUT_SEGMENTS + - carbonLoadModel.getDatabaseName + "." + - carbonLoadModel.getTableName, - carbonLoadModel.getSegmentId) - CarbonSession.threadSet( - CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + - carbonLoadModel.getDatabaseName + "." + - carbonLoadModel.getTableName, "false") val childTableName = dataMapSchema.getRelationIdentifier.getTableName val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName - val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction( - s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } ")).drop("preAggLoad") - val headers = dataMapSchema.getChildSchema.getListOfColumns. - asScala.map(_.getColumnName).mkString(",") - try { - CarbonLoadDataCommand(Some(childDatabaseName), - childTableName, - null, - Nil, - Map("fileheader" -> headers), - isOverwriteTable = false, - dataFrame = Some(childDataFrame), - internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")). - run(sparkSession) - } finally { - CarbonSession.threadUnset( - CarbonCommonConstants.CARBON_INPUT_SEGMENTS + - carbonLoadModel.getDatabaseName + "." + - carbonLoadModel.getTableName) - CarbonSession.threadUnset( - CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + - carbonLoadModel.getDatabaseName + "." + - carbonLoadModel.getTableName) - } + PreAggregateUtil.startDataLoadForDataMap( + table, + TableIdentifier(childTableName, Some(childDatabaseName)), + dataMapSchema.getProperties.get("CHILD_SELECT QUERY"), + carbonLoadModel.getSegmentId, + validateSegments = false, + sparkSession) } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e8707a6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index 43dc39e..95a711e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.preaaggregate import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.JavaConverters._ -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, DataFrame, SparkSession} import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation} @@ -31,12 +31,14 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.types.DataType import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast} +import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand +import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} -import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.format.TableInfo import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -493,4 +495,50 @@ object PreAggregateUtil { updatedPlan } + /** + * This method will start load process on the data map + */ + def startDataLoadForDataMap( + parentCarbonTable: CarbonTable, + dataMapIdentifier: TableIdentifier, + queryString: String, + segmentToLoad: String, + validateSegments: Boolean, + sparkSession: SparkSession): Unit = { + CarbonSession.threadSet( + CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + parentCarbonTable.getDatabaseName + "." + + parentCarbonTable.getTableName, + segmentToLoad) + CarbonSession.threadSet( + CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + parentCarbonTable.getDatabaseName + "." + + parentCarbonTable.getTableName, validateSegments.toString) + val headers = parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala. + find(_.getChildSchema.getTableName.equals(dataMapIdentifier.table)).get.getChildSchema. + getListOfColumns.asScala.map(_.getColumnName).mkString(",") + val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction( + queryString)).drop("preAggLoad") + try { + CarbonLoadDataCommand(dataMapIdentifier.database, + dataMapIdentifier.table, + null, + Nil, + Map("fileheader" -> headers), + isOverwriteTable = false, + dataFrame = Some(dataFrame), + internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")). + run(sparkSession) + } finally { + CarbonSession.threadUnset( + CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + parentCarbonTable.getDatabaseName + "." + + parentCarbonTable.getTableName) + CarbonSession.threadUnset( + CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + parentCarbonTable.getDatabaseName + "." + + parentCarbonTable.getTableName) + } + } + }
