Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1605#discussion_r154972340
--- Diff:
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
---
@@ -212,4 +221,56 @@ case class CarbonAlterTableCompactionCommand(
}
}
}
+
+ private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel,
+ sparkSession: SparkSession): Unit = {
+ val carbonTable =
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val loadMetaDataDetails = CarbonDataMergerUtil
+ .identifySegmentsToBeMerged(carbonLoadModel,
+ CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR),
+ carbonLoadModel.getLoadMetadataDetails,
+ carbonLoadModel.getCompactionType)
+ val segments = loadMetaDataDetails.asScala.map(_.getLoadName)
+ if (segments.nonEmpty) {
+ CarbonSession
+ .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ carbonLoadModel.getDatabaseName + "." +
+ carbonLoadModel.getTableName,
+ segments.mkString(","))
+
CarbonSession.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+ carbonLoadModel.getDatabaseName + "." +
+ carbonLoadModel.getTableName, "false")
+ val headers =
carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
+ .map(_.getColumnName).mkString(",")
+ val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser()
+ .addPreAggLoadFunction(PreAggregateUtil
+
.createChildSelectQuery(carbonTable.getTableInfo.getFactTable))).drop("preAggLoad")
+ try {
+ CarbonLoadDataCommand(Some(carbonTable.getDatabaseName),
+ carbonTable.getTableName,
+ null,
+ Nil,
+ Map("fileheader" -> headers),
+ isOverwriteTable = false,
+ dataFrame = Some(childDataFrame),
+ internalOptions =
Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true",
+ "compactionType" ->
carbonLoadModel.getCompactionType.toString)).run(sparkSession)
+ } finally {
+ // check if any other segments needs compaction on in case of
MINOR_COMPACTION.
+ // For example: after 8.1 creation 0.1, 4.1, 8.1 have to be merged
to 0.2 if threshhold
+ // allows it.
+ if
(!carbonLoadModel.getCompactionType.equals(CompactionType.MAJOR)) {
+ CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+ startCompactionForDataMap(carbonLoadModel, sparkSession)
--- End diff --
We should avoid the recursive call, you can invoke it in caller of this
function.
---