Repository: carbondata Updated Branches: refs/heads/master 2fe7758be -> 5ae596b76
[CARBONDATA-1738] [PreAgg] Block direct insert/load on pre-aggregate table This closes #1508 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5ae596b7 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5ae596b7 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5ae596b7 Branch: refs/heads/master Commit: 5ae596b76f1c15bd78f992bec1c51ae76223f635 Parents: 2fe7758 Author: kunal642 <[email protected]> Authored: Thu Nov 16 17:58:50 2017 +0530 Committer: ravipesala <[email protected]> Committed: Mon Dec 4 16:20:23 2017 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 6 + .../preaggregate/TestPreAggregateLoad.scala | 7 +- .../TestPreAggregateTableSelection.scala | 1 - .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 6 + .../scala/org/apache/spark/sql/CarbonEnv.scala | 4 + .../org/apache/spark/sql/CarbonSession.scala | 11 ++ .../management/CarbonLoadDataCommand.scala | 46 +++--- .../CreatePreAggregateTableCommand.scala | 26 +++- .../preaaggregate/PreAggregateListeners.scala | 65 ++++++-- .../preaaggregate/PreAggregateUtil.scala | 12 +- .../sql/hive/CarbonPreAggregateRules.scala | 155 ++++++++++--------- .../execution/command/CarbonHiveCommands.scala | 4 + .../sql/parser/CarbonSpark2SqlParser.scala | 12 +- .../src/main/spark2.1/CarbonSessionState.scala | 1 + .../src/main/spark2.2/CarbonSessionState.scala | 1 + 15 files changed, 239 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index a264583..43985b2 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -67,6 +67,12 @@ public final class CarbonCommonConstants { public static final String VALIDATE_CARBON_INPUT_SEGMENTS = "validate.carbon.input.segments."; /** + * Whether load/insert command is fired internally or by the user. + * Used to block load/insert on pre-aggregate if fired by user + */ + public static final String IS_INTERNAL_LOAD_CALL = "is.internal.load.call"; + + /** * location of the carbon member, hierarchy and fact files */ @CarbonProperty http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala index 5ac3534..1502c53 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala @@ -162,7 +162,12 @@ class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll { Row(2, 27), Row(3, 35), Row(4, 29))) - sql("drop table if exists maintable") + } + + test("test to check if exception is thrown for direct load on pre-aggregate table") { + assert(intercept[RuntimeException] { + sql(s"insert into maintable_preagg_sum values(1, 30)") + }.getMessage.equalsIgnoreCase("Cannot insert/load data directly into pre-aggregate table")) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala index 1480ae3..c29beec 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.carbondata.integration.spark.testsuite.preaggregate import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 4ad939c..094a629 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -1006,6 +1006,12 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { "select preAGG() as preAgg, " + query } + lazy val addPreAggLoad: Parser[String] = + SELECT ~> restInput <~ opt(";") ^^ { + case query => + "select preAggLoad() as preAggLoad, " + query + } + protected lazy val primitiveFieldType: Parser[Field] = primitiveTypes ^^ { case e1 => http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 811442b..53b20c2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -57,6 +57,10 @@ class CarbonEnv { // added for handling preaggregate table creation. when user will fire create ddl for // create table we are adding a udf so no need to apply PreAggregate rules. sparkSession.udf.register("preAgg", () => "") + // added to apply proper rules for loading data into pre-agg table. If this UDF is present + // only then the CarbonPreAggregateDataLoadingRules would be applied to split the average + // column to sum and count. + sparkSession.udf.register("preAggLoad", () => "") synchronized { if (!initialized) { // update carbon session parameters , preserve thread parameters http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index 89cbbe4..0cb6ca6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -232,6 +232,16 @@ object CarbonSession { ThreadLocalSessionInfo.setCarbonSessionInfo(currentThreadSessionInfo) } + def threadUnset(key: String): Unit = { + val currentThreadSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo + if (currentThreadSessionInfo != null) { + val currentThreadSessionInfoClone = currentThreadSessionInfo.clone() + val threadParams = currentThreadSessionInfoClone.getThreadParams + CarbonSetCommand.unsetValue(threadParams, key) + ThreadLocalSessionInfo.setCarbonSessionInfo(currentThreadSessionInfoClone) + } + } + private[spark] def updateSessionInfoToCurrentThread(sparkSession: SparkSession): Unit = { val carbonSessionInfo = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.clone() val currentThreadSessionInfoOrig = ThreadLocalSessionInfo.getCarbonSessionInfo @@ -260,5 +270,6 @@ object CarbonSession { .addListener(classOf[AlterTableDataTypeChangePreEvent], PreAggregateDataTypeChangePreListener) .addListener(classOf[AlterTableAddColumnPreEvent], PreAggregateAddColumnsPreListener) .addListener(classOf[DropDataMapPostEvent], DropDataMapPostListener) + .addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index ff13299..47467df 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -39,7 +39,8 @@ import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} import org.apache.carbondata.core.statusmanager.SegmentStatus import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonStorePath -import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.events.{LoadTablePostExecutionEvent, OperationContext} +import org.apache.carbondata.events.{LoadTablePreExecutionEvent, OperationListenerBus} import org.apache.carbondata.format import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.exception.NoRetryException @@ -58,8 +59,8 @@ case class CarbonLoadDataCommand( var inputSqlString: String = null, dataFrame: Option[DataFrame] = None, updateModel: Option[UpdateTableModel] = None, - var tableInfoOp: Option[TableInfo] = None) - extends DataCommand { + var tableInfoOp: Option[TableInfo] = None, + internalOptions: Map[String, String] = Map.empty) extends DataCommand { override def processData(sparkSession: SparkSession): Seq[Row] = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -127,6 +128,8 @@ case class CarbonLoadDataCommand( CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser), hadoopConf) } carbonLoadModel.setFactFilePath(factPath) + carbonLoadModel.setAggLoadRequest(internalOptions + .getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean) DataLoadingUtil.buildCarbonLoadModel( table, carbonProperty, @@ -135,17 +138,18 @@ case class CarbonLoadDataCommand( carbonLoadModel, hadoopConf ) - val operationContext = new OperationContext - val loadTablePreExecutionEvent: LoadTablePreExecutionEvent = - new LoadTablePreExecutionEvent(sparkSession, - null, - carbonLoadModel, - factPath, - dataFrame.isDefined, - optionsFinal) - OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext) - try{ + + try { + val operationContext = new OperationContext + val loadTablePreExecutionEvent: LoadTablePreExecutionEvent = + LoadTablePreExecutionEvent(sparkSession, + table.getCarbonTableIdentifier, + carbonLoadModel, + factPath, + dataFrame.isDefined, + optionsFinal) + OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext) // First system has to partition the data first and then call the load data LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata @@ -345,14 +349,14 @@ case class CarbonLoadDataCommand( } else { (dataFrame, dataFrame) } - - GlobalDictionaryUtil.generateGlobalDictionary( - sparkSession.sqlContext, - carbonLoadModel, - hadoopConf, - dictionaryDataFrame) - CarbonDataRDDFactory.loadCarbonData( - sparkSession.sqlContext, + if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) { + GlobalDictionaryUtil.generateGlobalDictionary( + sparkSession.sqlContext, + carbonLoadModel, + hadoopConf, + dictionaryDataFrame) + } + CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, carbonLoadModel, carbonLoadModel.getTablePath, columnar, http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index a17e745..6cee0e8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.execution.command.preaaggregate +import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand} import org.apache.spark.sql.parser.CarbonSpark2SqlParser @@ -114,17 +116,27 @@ case class CreatePreAggregateTableCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { // load child table if parent table has existing segments val dbName = CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession) - val tableName = tableIdentifier.table - val metastorePath = CarbonTablePath.getMetadataPath( - CarbonEnv.getTablePath( - parentTableIdentifier.database, - parentTableIdentifier.table)(sparkSession)) + val parentCarbonTable = CarbonEnv.getCarbonTable(Some(dbName), + parentTableIdentifier.table)(sparkSession) // This will be used to check if the parent table has any segments or not. If not then no // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT // table. - val loadAvailable = SegmentStatusManager.readLoadMetadata(metastorePath).nonEmpty + val loadAvailable = SegmentStatusManager.readLoadMetadata(parentCarbonTable.getMetaDataFilepath) + .nonEmpty if (loadAvailable) { - sparkSession.sql(s"insert into $dbName.$tableName $queryString") + val headers = parentCarbonTable.getTableInfo.getFactTable.getListOfColumns. + asScala.map(_.getColumnName).mkString(",") + val childDataFrame = sparkSession.sql( + new CarbonSpark2SqlParser().addPreAggLoadFunction(queryString)) + CarbonLoadDataCommand(tableIdentifier.database, + tableIdentifier.table, + null, + Nil, + Map("fileheader" -> headers), + isOverwriteTable = false, + dataFrame = Some(childDataFrame), + internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")). + run(sparkSession) } Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index 7bc120b..d314488 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.execution.command.preaaggregate import scala.collection.JavaConverters._ +import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.CarbonSession -import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand +import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.DataMapSchema @@ -39,23 +40,65 @@ object LoadPostAggregateListener extends OperationEventListener { val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable if (table.hasDataMapSchema) { for (dataMapSchema: DataMapSchema <- table.getTableInfo.getDataMapSchemaList.asScala) { - CarbonSession - .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + - carbonLoadModel.getDatabaseName + "." + - carbonLoadModel.getTableName, - carbonLoadModel.getSegmentId) - CarbonSession.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + - carbonLoadModel.getDatabaseName + "." + - carbonLoadModel.getTableName, "false") + CarbonSession.threadSet( + CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName, + carbonLoadModel.getSegmentId) + CarbonSession.threadSet( + CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName, "false") val childTableName = dataMapSchema.getRelationIdentifier.getTableName val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName - val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT QUERY") - sparkSession.sql(s"insert into $childDatabaseName.$childTableName $selectQuery") + val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction( + s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } ")).drop("preAggLoad") + val headers = dataMapSchema.getChildSchema.getListOfColumns. + asScala.map(_.getColumnName).mkString(",") + try { + CarbonLoadDataCommand(Some(childDatabaseName), + childTableName, + null, + Nil, + Map("fileheader" -> headers), + isOverwriteTable = false, + dataFrame = Some(childDataFrame), + internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")). + run(sparkSession) + } finally { + CarbonSession.threadUnset( + CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName) + CarbonSession.threadUnset( + CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + carbonLoadModel.getDatabaseName + "." + + carbonLoadModel.getTableName) + } } } } } +object LoadPreAggregateTablePreListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val loadEvent = event.asInstanceOf[LoadTablePreExecutionEvent] + val carbonLoadModel = loadEvent.carbonLoadModel + val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val isInternalLoadCall = carbonLoadModel.isAggLoadRequest + if (table.isChildDataMap && !isInternalLoadCall) { + throw new UnsupportedOperationException( + "Cannot insert/load data directly into pre-aggregate table") + } + } +} + object PreAggregateDataTypeChangePreListener extends OperationEventListener { /** * Called on a specified event occurrence http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index c50e717..43dc39e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -199,6 +199,13 @@ object PreAggregateUtil { carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, parentTableName, parentDatabaseName, parentTableId = parentTableId) + case count@Count(Seq(Cast(attr: AttributeReference, _))) => + list += getField(attr.name, + attr.dataType, + count.prettyName, + carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, + parentTableName, + parentDatabaseName, parentTableId = parentTableId) case min@Min(attr: AttributeReference) => list += getField(attr.name, attr.dataType, @@ -253,8 +260,9 @@ object PreAggregateUtil { carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, parentTableName, parentDatabaseName, parentTableId = parentTableId) - case _ => - throw new MalformedCarbonCommandException("Un-Supported Aggregation Type") + case others@_ => + throw new MalformedCarbonCommandException(s"Un-Supported Aggregation Type: ${ + others.prettyName}") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index baa9008..2875817 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CarbonException @@ -79,6 +78,9 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase("preAgg") => needAnalysis = false al + case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase("preAggLoad") => + needAnalysis = false + al // in case of query if any unresolve alias is present then wait for plan to be resolved // return the same plan as we can tranform the plan only when everything is resolved case unresolveAlias@UnresolvedAlias(_, _) => @@ -752,6 +754,80 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } +object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + val validExpressionsMap = scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression] + plan transform { + case aggregate@Aggregate(_, aExp, _) if validateAggregateExpressions(aExp) => + aExp.foreach { + case alias: Alias => + validExpressionsMap ++= validateAggregateFunctionAndGetAlias(alias) + case _: UnresolvedAlias => + case namedExpr: NamedExpression => validExpressionsMap.put(namedExpr.name, namedExpr) + } + aggregate.copy(aggregateExpressions = validExpressionsMap.values.toSeq) + case plan: LogicalPlan => plan + } + } + + /** + * This method will split the avg column into sum and count and will return a sequence of tuple + * of unique name, alias + * + */ + private def validateAggregateFunctionAndGetAlias(alias: Alias): Seq[(String, + NamedExpression)] = { + alias match { + case udf@Alias(_: ScalaUDF, name) => + Seq((name, udf)) + case alias@Alias(attrExpression: AggregateExpression, _) => + attrExpression.aggregateFunction match { + case Sum(attr: AttributeReference) => + (attr.name + "_sum", alias) :: Nil + case Sum(MatchCast(attr: AttributeReference, _)) => + (attr.name + "_sum", alias) :: Nil + case Count(Seq(attr: AttributeReference)) => + (attr.name + "_count", alias) :: Nil + case Count(Seq(MatchCast(attr: AttributeReference, _))) => + (attr.name + "_count", alias) :: Nil + case Average(attr: AttributeReference) => + Seq((attr.name + "_sum", Alias(attrExpression. + copy(aggregateFunction = Sum(attr), + resultId = NamedExpression.newExprId), attr.name + "_sum")()), + (attr.name, Alias(attrExpression. + copy(aggregateFunction = Count(attr), + resultId = NamedExpression.newExprId), attr.name + "_count")())) + case Average(cast@MatchCast(attr: AttributeReference, _)) => + Seq((attr.name + "_sum", Alias(attrExpression. + copy(aggregateFunction = Sum(cast), + resultId = NamedExpression.newExprId), + attr.name + "_sum")()), + (attr.name, Alias(attrExpression. + copy(aggregateFunction = Count(cast), resultId = + NamedExpression.newExprId), attr.name + "_count")())) + case _ => Seq(("", alias)) + } + + } + } + + /** + * Called by PreAggregateLoadingRules to validate if plan is valid for applying rules or not. + * If the plan has PreAggLoad i.e Loading UDF and does not have PreAgg i.e Query UDF then it is + * valid. + * + * @param namedExpression + * @return + */ + private def validateAggregateExpressions(namedExpression: Seq[NamedExpression]): Boolean = { + val filteredExpressions = namedExpression.filterNot(_.isInstanceOf[UnresolvedAlias]) + filteredExpressions.exists { expr => + !expr.name.equalsIgnoreCase("PreAgg") && expr.name.equalsIgnoreCase("preAggLoad") + } + } +} + /** * Insert into carbon table from other source */ @@ -769,23 +845,14 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi def castChildOutput(p: InsertIntoTable, relation: CarbonDatasourceHadoopRelation, - child: LogicalPlan) - : LogicalPlan = { + child: LogicalPlan): LogicalPlan = { if (relation.carbonRelation.output.size > CarbonCommonConstants .DEFAULT_MAX_NUMBER_OF_COLUMNS) { CarbonException.analysisException("Maximum number of columns supported:" + s"${CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS}") } - val isAggregateTable = !relation.carbonRelation.carbonTable.getTableInfo - .getParentRelationIdentifiers.isEmpty - // transform logical plan if the load is for aggregate table. - val childPlan = if (isAggregateTable) { - transformAggregatePlan(child) - } else { - child - } - if (childPlan.output.size >= relation.carbonRelation.output.size) { - val newChildOutput = childPlan.output.zipWithIndex.map { columnWithIndex => + if (child.output.size >= relation.carbonRelation.output.size) { + val newChildOutput = child.output.zipWithIndex.map { columnWithIndex => columnWithIndex._1 match { case attr: Alias => Alias(attr.child, s"col${ columnWithIndex._2 }")(attr.exprId) @@ -795,7 +862,7 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi } } val version = sparkSession.version - val newChild: LogicalPlan = if (newChildOutput == childPlan.output) { + val newChild: LogicalPlan = if (newChildOutput == child.output) { if (version.startsWith("2.1")) { CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan] } else if (version.startsWith("2.2")) { @@ -804,7 +871,7 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi throw new UnsupportedOperationException(s"Spark version $version is not supported") } } else { - Project(newChildOutput, childPlan) + Project(newChildOutput, child) } val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p) @@ -816,63 +883,5 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi } } - /** - * Transform the logical plan with average(col1) aggregation type to sum(col1) and count(col1). - * - * @param logicalPlan - * @return - */ - private def transformAggregatePlan(logicalPlan: LogicalPlan): LogicalPlan = { - val validExpressionsMap = scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression] - logicalPlan transform { - case aggregate@Aggregate(_, aExp, _) => - aExp.foreach { - case alias: Alias => - validExpressionsMap ++= validateAggregateFunctionAndGetAlias(alias) - case namedExpr: NamedExpression => validExpressionsMap.put(namedExpr.name, namedExpr) - } - aggregate.copy(aggregateExpressions = validExpressionsMap.values.toSeq) - case plan: LogicalPlan => plan - } - } - - /** - * This method will split the avg column into sum and count and will return a sequence of tuple - * of unique name, alias - * - */ - def validateAggregateFunctionAndGetAlias(alias: Alias): Seq[(String, NamedExpression)] = { - alias match { - case alias@Alias(attrExpression: AggregateExpression, _) => - attrExpression.aggregateFunction match { - case Sum(attr: AttributeReference) => - (attr.name + "_sum", alias) :: Nil - case Sum(Cast(attr: AttributeReference, _)) => - (attr.name + "_sum", alias) :: Nil - case Count(Seq(attr: AttributeReference)) => - (attr.name + "_count", alias) :: Nil - case Count(Seq(Cast(attr: AttributeReference, _))) => - (attr.name + "_count", alias) :: Nil - case Average(attr: AttributeReference) => - Seq((attr.name + "_sum", Alias(attrExpression - .copy(aggregateFunction = Sum(attr), - resultId = NamedExpression.newExprId), attr.name + "_sum")()), - (attr.name, Alias(attrExpression - .copy(aggregateFunction = Count(attr), - resultId = NamedExpression.newExprId), attr.name + "_count")())) - case Average(cast@Cast(attr: AttributeReference, _)) => - Seq((attr.name + "_sum", Alias(attrExpression - .copy(aggregateFunction = Sum(cast), - resultId = NamedExpression.newExprId), - attr.name + "_sum")()), - (attr.name, Alias(attrExpression - .copy(aggregateFunction = Count(cast), - resultId = NamedExpression.newExprId), attr.name + "_count")())) - case _ => Seq(("", alias)) - } - - } - } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala index b358f83..6761e92 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala @@ -96,6 +96,10 @@ object CarbonSetCommand { sessionParams.addProperty(key.toLowerCase(), value) } } + + def unsetValue(sessionParams: SessionParams, key: String): Unit = { + sessionParams.removeProperty(key) + } } case class CarbonResetCommand() http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 6f7b89a..b01c6d9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -569,8 +569,16 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { def addPreAggFunction(sql: String): String = { addPreAgg(new lexical.Scanner(sql.toLowerCase)) match { case Success(query, _) => query - case failureOrError => throw new MalformedCarbonCommandException( - s"Unsupported query") + case _ => + throw new MalformedCarbonCommandException(s"Unsupported query") + } + } + + def addPreAggLoadFunction(sql: String): String = { + addPreAggLoad(new lexical.Scanner(sql.toLowerCase)) match { + case Success(query, _) => query + case _ => + throw new MalformedCarbonCommandException(s"Unsupported query") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala index 9f66737..911c25d 100644 --- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala @@ -156,6 +156,7 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp catalog.OrcConversions :: CarbonPreInsertionCasts(sparkSession) :: CarbonPreAggregateQueryRules(sparkSession) :: + CarbonPreAggregateDataLoadingRules :: CarbonIUDAnalysisRule(sparkSession) :: AnalyzeCreateTable(sparkSession) :: PreprocessTableInsertion(conf) :: http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/spark2.2/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala index b3792cd..87aebc0 100644 --- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala @@ -248,6 +248,7 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends ctx.partitionColumns, ctx.columns, ctx.tablePropertyList, + ctx.locationSpec(), Option(ctx.STRING()).map(string), ctx.AS) } else {
