Repository: carbondata Updated Branches: refs/heads/master 6dcf4eb95 -> 2304303ca
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index c602b0a..851b851 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -38,7 +38,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, TableSchema} import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.format.TableInfo import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -541,4 +541,21 @@ object PreAggregateUtil { } } + def createChildSelectQuery(tableSchema: TableSchema, databaseName: String): String = { + val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String] + val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String] + tableSchema.getListOfColumns.asScala.foreach { + a => if (a.getAggFunction.nonEmpty) { + aggregateColumns += s"${a.getAggFunction match { + case "count" => "sum" + case _ => a.getAggFunction}}(${a.getColumnName})" + } else { + groupingExpressions += a.getColumnName + } + } + s"select ${ groupingExpressions.mkString(",") },${ aggregateColumns.mkString(",") + } from $databaseName.${ tableSchema.getTableName } group by ${ + groupingExpressions.mkString(",") }" + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 3922e76..d7e9867 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -274,9 +274,16 @@ public final class CarbonLoaderUtil { if (loadStartEntry) { String segmentId = String.valueOf(SegmentStatusManager.createNewSegmentId(listOfLoadFolderDetailsArray)); - newMetaEntry.setLoadName(segmentId); loadModel.setLoadMetadataDetails(listOfLoadFolderDetails); - loadModel.setSegmentId(segmentId); + // Segment id would be provided in case this is compaction flow for aggregate data map. + // If that is true then used the segment id as the load name. + if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() && !loadModel + .getSegmentId().isEmpty()) { + newMetaEntry.setLoadName(loadModel.getSegmentId()); + } else { + newMetaEntry.setLoadName(segmentId); + loadModel.setSegmentId(segmentId); + } // Exception should be thrown if: // 1. If insert overwrite is in progress and any other load or insert operation // is triggered
