Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1508#discussion_r153723385
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
 ---
    @@ -49,13 +50,54 @@ object LoadPostAggregateListener extends 
OperationEventListener {
                                     carbonLoadModel.getTableName, "false")
             val childTableName = 
dataMapSchema.getRelationIdentifier.getTableName
             val childDatabaseName = 
dataMapSchema.getRelationIdentifier.getDatabaseName
    -        val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT 
QUERY")
    -        sparkSession.sql(s"insert into $childDatabaseName.$childTableName 
$selectQuery")
    +        val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
    +          .addPreAggLoadFunction(s"${ 
dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } "))
    +          .drop("preAggLoad")
    +        val headers = 
dataMapSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName)
    +          .mkString(",")
    +        try {
    +          LoadTableCommand(Some(childDatabaseName),
    +            childTableName,
    +            null,
    +            Nil,
    +            Map("fileheader" -> headers),
    +            isOverwriteTable = false,
    +            dataFrame = Some(childDataFrame),
    +            internalOptions = 
Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"))
    +            .run(sparkSession)
    +        } finally {
    +          
CarbonSession.threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
    +                                    carbonLoadModel.getDatabaseName + "." +
    +                                    carbonLoadModel.getTableName)
    +          
CarbonSession.threadUnset(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
    +                                    carbonLoadModel.getDatabaseName + "." +
    +                                    carbonLoadModel.getTableName)
    +        }
           }
         }
       }
     }
     
    +object LoadPreAggregateTablePreListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override def onEvent(event: Event, operationContext: OperationContext): 
Unit = {
    +    val loadEvent = event.asInstanceOf[LoadTablePreExecutionEvent]
    +    val carbonLoadModel = loadEvent.carbonLoadModel
    +    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
    +    val isInternalLoadCall = carbonLoadModel.isAggLoadRequest
    +    if (table.isChildDataMap && !isInternalLoadCall) {
    +      throw new UnsupportedOperationException(
    +        "Cannot insert/load data directly into pre-aggregate table")
    +    }
    +
    --- End diff --
    
    remove line


---

Reply via email to