Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1508#discussion_r153723385
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
---
@@ -49,13 +50,54 @@ object LoadPostAggregateListener extends
OperationEventListener {
carbonLoadModel.getTableName, "false")
val childTableName =
dataMapSchema.getRelationIdentifier.getTableName
val childDatabaseName =
dataMapSchema.getRelationIdentifier.getDatabaseName
- val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT
QUERY")
- sparkSession.sql(s"insert into $childDatabaseName.$childTableName
$selectQuery")
+ val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
+ .addPreAggLoadFunction(s"${
dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } "))
+ .drop("preAggLoad")
+ val headers =
dataMapSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName)
+ .mkString(",")
+ try {
+ LoadTableCommand(Some(childDatabaseName),
+ childTableName,
+ null,
+ Nil,
+ Map("fileheader" -> headers),
+ isOverwriteTable = false,
+ dataFrame = Some(childDataFrame),
+ internalOptions =
Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"))
+ .run(sparkSession)
+ } finally {
+
CarbonSession.threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ carbonLoadModel.getDatabaseName + "." +
+ carbonLoadModel.getTableName)
+
CarbonSession.threadUnset(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+ carbonLoadModel.getDatabaseName + "." +
+ carbonLoadModel.getTableName)
+ }
}
}
}
}
+object LoadPreAggregateTablePreListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override def onEvent(event: Event, operationContext: OperationContext):
Unit = {
+ val loadEvent = event.asInstanceOf[LoadTablePreExecutionEvent]
+ val carbonLoadModel = loadEvent.carbonLoadModel
+ val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val isInternalLoadCall = carbonLoadModel.isAggLoadRequest
+ if (table.isChildDataMap && !isInternalLoadCall) {
+ throw new UnsupportedOperationException(
+ "Cannot insert/load data directly into pre-aggregate table")
+ }
+
--- End diff --
remove line
---