[CARBONDATA-2076] Refactored code segregated process meta and process data in load command
This closes #1837 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3a6136df Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3a6136df Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3a6136df Branch: refs/heads/carbonstore Commit: 3a6136df066b9d0cdeecec283115e4a99d82a900 Parents: 7d43442 Author: kumarvishal <[email protected]> Authored: Fri Jan 19 17:22:28 2018 +0530 Committer: ravipesala <[email protected]> Committed: Thu Jan 25 16:28:34 2018 +0530 ---------------------------------------------------------------------- .../indexstore/BlockletDataMapIndexStore.java | 2 +- .../carbondata/core/util/SessionParams.java | 14 +- .../hadoop/api/CarbonOutputCommitter.java | 28 ++- .../spark/rdd/AggregateDataMapCompactor.scala | 39 ++-- .../spark/rdd/CarbonDataRDDFactory.scala | 6 +- .../spark/rdd/CarbonTableCompactor.scala | 4 +- .../spark/rdd/CompactionFactory.scala | 10 +- .../scala/org/apache/spark/sql/CarbonEnv.scala | 4 +- .../org/apache/spark/sql/CarbonSession.scala | 18 +- .../CarbonAlterTableCompactionCommand.scala | 53 ++--- .../management/CarbonInsertIntoCommand.scala | 24 ++- .../management/CarbonLoadDataCommand.scala | 111 ++++------- .../CreatePreAggregateTableCommand.scala | 66 ++++--- .../preaaggregate/PreAggregateListeners.scala | 194 +++++++++++++++---- .../preaaggregate/PreAggregateUtil.scala | 76 ++++---- .../CarbonCreateTableAsSelectCommand.scala | 27 +-- .../datasources/CarbonFileFormat.scala | 4 - .../sql/execution/strategy/DDLStrategy.scala | 2 +- .../apache/spark/sql/hive/CarbonRelation.scala | 16 +- .../processing/loading/events/LoadEvents.java | 19 ++ 20 files changed, 447 insertions(+), 270 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java index ad80fd7..111a7a2 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java @@ -138,7 +138,7 @@ public class BlockletDataMapIndexStore partitionFileStore.readAllPartitionsOfSegment(carbonFiles, segmentPath); partitionFileStoreMap.put(identifier.getSegmentId(), partitionFileStore); for (CarbonFile file : carbonFiles) { - blockMetaInfoMap.put(file.getAbsolutePath(), + blockMetaInfoMap.put(FileFactory.getUpdatedFilePath(file.getAbsolutePath()), new BlockMetaInfo(file.getLocations(), file.getSize())); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java index 3f0e856..afbd947 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java +++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java @@ -52,10 +52,19 @@ public class SessionParams implements Serializable { private Map<String, String> sProps; private Map<String, String> addedProps; - + private Map<String, Object> extraInfo; public SessionParams() { sProps = new HashMap<>(); addedProps = new HashMap<>(); + extraInfo = new HashMap<>(); + } + + public void setExtraInfo(String key, Object value) { + this.extraInfo.put(key, value); + } + + public Object getExtraInfo(String key) { + return this.extraInfo.get(key); } /** @@ -198,6 +207,9 @@ public class SessionParams implements Serializable { sProps.remove(property); } + public void removeExtraInfo(String key) { + extraInfo.remove(key); + } /** * clear the set properties */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index eb18bbd..f6e928d 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -18,7 +18,10 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; -import java.util.*; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -30,11 +33,12 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonSessionInfo; +import org.apache.carbondata.core.util.ThreadLocalSessionInfo; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter; import org.apache.carbondata.events.OperationContext; import org.apache.carbondata.events.OperationListenerBus; -import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; import org.apache.carbondata.processing.loading.events.LoadEvents; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.util.CarbonLoaderUtil; @@ -106,18 +110,13 @@ public class CarbonOutputCommitter extends FileOutputCommitter { long segmentSize = CarbonLoaderUtil .addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), carbonTable); if (segmentSize > 0 || overwriteSet) { - String operationContextStr = - context.getConfiguration().get( - CarbonTableOutputFormat.OPERATION_CONTEXT, - null); - if (operationContextStr != null) { - OperationContext operationContext = - (OperationContext) ObjectSerializationUtil.convertStringToObject(operationContextStr); + Object operationContext = getOperationContext(); + if (operationContext != null) { LoadEvents.LoadTablePreStatusUpdateEvent event = new LoadEvents.LoadTablePreStatusUpdateEvent(carbonTable.getCarbonTableIdentifier(), loadModel); try { - OperationListenerBus.getInstance().fireEvent(event, operationContext); + OperationListenerBus.getInstance().fireEvent(event, (OperationContext) operationContext); } catch (Exception e) { throw new IOException(e); } @@ -145,6 +144,15 @@ public class CarbonOutputCommitter extends FileOutputCommitter { } } + private Object getOperationContext() { + // when validate segments is disabled in thread local update it to CarbonTableInputFormat + CarbonSessionInfo carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo(); + if (carbonSessionInfo != null) { + return carbonSessionInfo.getThreadParams().getExtraInfo("partition.operationcontext"); + } + return null; + } + /** * Merge index files to a new single file. */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala index 636d731..5f8f389 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala @@ -24,11 +24,11 @@ import org.apache.spark.sql.{CarbonSession, SQLContext} import org.apache.spark.sql.execution.command.CompactionModel import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil -import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.events.OperationContext import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} @@ -39,7 +39,8 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, compactionModel: CompactionModel, executor: ExecutorService, sqlContext: SQLContext, - storeLocation: String) + storeLocation: String, + operationContext: OperationContext) extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) { override def executeCompaction(): Unit = { @@ -57,30 +58,17 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + carbonLoadModel.getDatabaseName + "." + carbonLoadModel.getTableName, "false") - val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala - .map(_.getColumnName).mkString(",") - // Creating a new query string to insert data into pre-aggregate table from that same table. - // For example: To compact preaggtable1 we can fire a query like insert into preaggtable1 - // select * from preaggtable1 - // The following code will generate the select query with a load UDF that will be used to - // apply DataLoadingRules - val childDataFrame = sqlContext.sparkSession.sql(new CarbonSpark2SqlParser() - // adding the aggregation load UDF - .addPreAggLoadFunction( - // creating the select query on the bases on table schema - PreAggregateUtil.createChildSelectQuery( - carbonTable.getTableInfo.getFactTable, carbonTable.getDatabaseName))).drop("preAggLoad") + CarbonSession.updateSessionInfoToCurrentThread(sqlContext.sparkSession) + val loadCommand = operationContext.getProperty(carbonTable.getTableName + "_Compaction") + .asInstanceOf[CarbonLoadDataCommand] try { - CarbonLoadDataCommand( - Some(carbonTable.getDatabaseName), - carbonTable.getTableName, - null, - Nil, - Map("fileheader" -> headers), - isOverwriteTable = false, - dataFrame = Some(childDataFrame), - internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true", - "mergedSegmentName" -> mergedLoadName)).run(sqlContext.sparkSession) + val newInternalOptions = loadCommand.internalOptions ++ + Map("mergedSegmentName" -> mergedLoadName) + loadCommand.internalOptions = newInternalOptions + loadCommand.dataFrame = + Some(PreAggregateUtil.getDataFrame( + sqlContext.sparkSession, loadCommand.logicalPlan.get)) + loadCommand.processData(sqlContext.sparkSession) val newLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata( carbonTable.getMetaDataFilepath) val updatedLoadMetaDataDetails = newLoadMetaDataDetails collect { @@ -103,6 +91,7 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, // For example: after 8.1 creation 0.1, 4.1, 8.1 have to be merged to 0.2 if threshhold // allows it. if (!compactionModel.compactionType.equals(CompactionType.MAJOR)) { + executeCompaction() } CarbonSession http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index f37fbd7..809c8ff 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -173,7 +173,8 @@ object CarbonDataRDDFactory { compactionModel, executor, sqlContext, - storeLocation) + storeLocation, + operationContext) try { // compaction status of the table which is triggered by the user. var triggeredCompactionStatus = false @@ -225,7 +226,8 @@ object CarbonDataRDDFactory { newcompactionModel, executor, sqlContext, - storeLocation).executeCompaction() + storeLocation, + operationContext).executeCompaction() } catch { case e: Exception => LOGGER.error("Exception in compaction thread for table " + http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index 0dc856d..a0c8f65 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -45,7 +45,8 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, compactionModel: CompactionModel, executor: ExecutorService, sqlContext: SQLContext, - storeLocation: String) + storeLocation: String, + operationContext: OperationContext) extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) { override def executeCompaction(): Unit = { @@ -170,7 +171,6 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, carbonLoadModel.setLoadMetadataDetails( SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava) // trigger event for compaction - val operationContext = new OperationContext val alterTableCompactionPreEvent: AlterTableCompactionPreEvent = AlterTableCompactionPreEvent(sqlContext.sparkSession, carbonTable, http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala index 6060f06..8508d2a 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService import org.apache.spark.sql.SQLContext import org.apache.spark.sql.execution.command.CompactionModel +import org.apache.carbondata.events.OperationContext import org.apache.carbondata.processing.loading.model.CarbonLoadModel object CompactionFactory { @@ -33,21 +34,24 @@ object CompactionFactory { compactionModel: CompactionModel, executor: ExecutorService, sqlContext: SQLContext, - storeLocation: String): Compactor = { + storeLocation: String, + operationContext: OperationContext): Compactor = { if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) { new AggregateDataMapCompactor( carbonLoadModel, compactionModel, executor, sqlContext, - storeLocation) + storeLocation, + operationContext) } else { new CarbonTableCompactor( carbonLoadModel, compactionModel, executor, sqlContext, - storeLocation) + storeLocation, + operationContext) } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index bbc3c2d..585fe67 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -32,7 +32,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util._ import org.apache.carbondata.events._ -import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} import org.apache.carbondata.spark.rdd.SparkReadSupport import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl @@ -143,6 +143,8 @@ object CarbonEnv { .addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener) .addListener(classOf[AlterTableCompactionPreStatusUpdateEvent], AlterPreAggregateTableCompactionPostListener) + .addListener(classOf[LoadMetadataEvent], LoadProcessMetaListener) + .addListener(classOf[LoadMetadataEvent], CompactionProcessMetaListener) } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index c2c15fe..e95b8db 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -53,7 +53,7 @@ class CarbonSession(@transient val sc: SparkContext, * and a catalog that interacts with external systems. */ @transient - override lazy val sharedState: SharedState = { + override lazy val sharedState: SharedState = { existingSharedState match { case Some(_) => val ss = existingSharedState.get @@ -214,17 +214,31 @@ object CarbonSession { ThreadLocalSessionInfo.setCarbonSessionInfo(currentThreadSessionInfo) } + + def threadSet(key: String, value: Object): Unit = { + var currentThreadSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo + if (currentThreadSessionInfo == null) { + currentThreadSessionInfo = new CarbonSessionInfo() + } + else { + currentThreadSessionInfo = currentThreadSessionInfo.clone() + } + currentThreadSessionInfo.getThreadParams.setExtraInfo(key, value) + ThreadLocalSessionInfo.setCarbonSessionInfo(currentThreadSessionInfo) + } + def threadUnset(key: String): Unit = { val currentThreadSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo if (currentThreadSessionInfo != null) { val currentThreadSessionInfoClone = currentThreadSessionInfo.clone() val threadParams = currentThreadSessionInfoClone.getThreadParams CarbonSetCommand.unsetValue(threadParams, key) + threadParams.removeExtraInfo(key) ThreadLocalSessionInfo.setCarbonSessionInfo(currentThreadSessionInfoClone) } } - private[spark] def updateSessionInfoToCurrentThread(sparkSession: SparkSession): Unit = { + def updateSessionInfoToCurrentThread(sparkSession: SparkSession): Unit = { val carbonSessionInfo = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.clone() val currentThreadSessionInfoOrig = ThreadLocalSessionInfo.getCarbonSessionInfo if (currentThreadSessionInfoOrig != null) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index 6af0e98..fb0f9fe 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.command.{AlterTableModel, CarbonMergerMapping, CompactionModel, DataCommand} +import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel, DataCommand} import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog} import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.util.CarbonException @@ -37,9 +37,10 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, AlterTableCompactionPreStatusUpdateEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.spark.exception.ConcurrentOperationException @@ -53,34 +54,42 @@ import org.apache.carbondata.streaming.segment.StreamSegment */ case class CarbonAlterTableCompactionCommand( alterTableModel: AlterTableModel, - tableInfoOp: Option[TableInfo] = None) - extends DataCommand { + tableInfoOp: Option[TableInfo] = None, + val operationContext: OperationContext = new OperationContext ) extends AtomicRunnableCommand { - override def processData(sparkSession: SparkSession): Seq[Row] = { - val LOGGER: LogService = - LogServiceFactory.getLogService(this.getClass.getName) - val tableName = alterTableModel.tableName.toLowerCase - val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase) + var table: CarbonTable = _ - val table = if (tableInfoOp.isDefined) { - val tableInfo = tableInfoOp.get - // To DO: CarbonEnv.updateStorePath - CarbonTable.buildFromTableInfo(tableInfo) + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val tableName = alterTableModel.tableName.toLowerCase + val dbName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase) + table = if (tableInfoOp.isDefined) { + CarbonTable.buildFromTableInfo(tableInfoOp.get) } else { - val relation = - CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(Option(databaseName), tableName)(sparkSession) - .asInstanceOf[CarbonRelation] + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] if (relation == null) { - throw new NoSuchTableException(databaseName, tableName) + throw new NoSuchTableException(dbName, tableName) } if (null == relation.carbonTable) { - LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName") - throw new NoSuchTableException(databaseName, tableName) + LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName") + throw new NoSuchTableException(dbName, tableName) } relation.carbonTable } + if (CarbonUtil.hasAggregationDataMap(table) || + (table.isChildDataMap && null == operationContext.getProperty(table.getTableName))) { + val loadMetadataEvent = new LoadMetadataEvent(table, true) + OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext) + } + Seq.empty + } + override def processData(sparkSession: SparkSession): Seq[Row] = { + val LOGGER: LogService = + LogServiceFactory.getLogService(this.getClass.getName) + val tableName = alterTableModel.tableName.toLowerCase + val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase) val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(table) if (isLoadInProgress) { val message = "Cannot run data loading and compaction on same table concurrently. " + @@ -88,7 +97,6 @@ case class CarbonAlterTableCompactionCommand( LOGGER.error(message) throw new ConcurrentOperationException(message) } - val carbonLoadModel = new CarbonLoadModel() carbonLoadModel.setTableName(table.getTableName) val dataLoadSchema = new CarbonDataLoadSchema(table) @@ -103,7 +111,6 @@ case class CarbonAlterTableCompactionCommand( System.getProperty("java.io.tmpdir")) storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() // trigger event for compaction - val operationContext = new OperationContext val alterTableCompactionPreEvent: AlterTableCompactionPreEvent = AlterTableCompactionPreEvent(sparkSession, table, null, null) OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext) @@ -242,8 +249,6 @@ case class CarbonAlterTableCompactionCommand( compactionModel.currentPartitions, null) - // trigger event for merge index - val operationContext = new OperationContext // trigger event for compaction val alterTableCompactionPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent = AlterTableCompactionPreStatusUpdateEvent(sqlContext.sparkSession, http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala index 810b10f..626cdba 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.management import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataCommand} import org.apache.carbondata.spark.util.CarbonSparkUtil @@ -28,12 +28,14 @@ case class CarbonInsertIntoCommand( child: LogicalPlan, overwrite: Boolean, partition: Map[String, Option[String]]) - extends DataCommand { + extends AtomicRunnableCommand { - override def processData(sparkSession: SparkSession): Seq[Row] = { + var loadCommand: CarbonLoadDataCommand = _ + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val df = Dataset.ofRows(sparkSession, child) val header = relation.tableSchema.get.fields.map(_.name).mkString(",") - val load = CarbonLoadDataCommand( + loadCommand = CarbonLoadDataCommand( databaseNameOp = Some(relation.carbonRelation.databaseName), tableName = relation.carbonRelation.tableName, factPathFromUser = null, @@ -45,10 +47,14 @@ case class CarbonInsertIntoCommand( updateModel = None, tableInfoOp = None, internalOptions = Map.empty, - partition = partition).run(sparkSession) - // updating relation metadata. This is in case of auto detect high cardinality - relation.carbonRelation.metaData = - CarbonSparkUtil.createSparkMeta(relation.carbonRelation.carbonTable) - load + partition = partition) + loadCommand.processMetadata(sparkSession) + } + override def processData(sparkSession: SparkSession): Seq[Row] = { + if (null != loadCommand) { + loadCommand.processData(sparkSession) + } else { + Seq.empty + } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 6b43152..7afbd92 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -39,15 +39,14 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project} import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY -import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, UpdateTableModel} +import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel} import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.optimizer.CarbonFilters -import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils} -import org.apache.carbondata.common.constants.LoggerAction import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.datamap.DataMapStoreManager @@ -57,26 +56,24 @@ import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceP import org.apache.carbondata.core.metadata.PartitionMapFileStore import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.{CarbonStorePath} import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.events.exception.PreEventException -import org.apache.carbondata.format import org.apache.carbondata.hadoop.util.ObjectSerializationUtil import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable} -import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} -import org.apache.carbondata.processing.loading.exception.{BadRecordFoundException, NoRetryException} -import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} +import org.apache.carbondata.processing.loading.exception.{NoRetryException} +import org.apache.carbondata.processing.loading.model.{CarbonLoadModel} import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer import org.apache.carbondata.spark.exception.MalformedCarbonCommandException -import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD, DictionaryLoadModel} +import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD} import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil} case class CarbonLoadDataCommand( @@ -87,12 +84,41 @@ case class CarbonLoadDataCommand( options: scala.collection.immutable.Map[String, String], isOverwriteTable: Boolean, var inputSqlString: String = null, - dataFrame: Option[DataFrame] = None, + var dataFrame: Option[DataFrame] = None, updateModel: Option[UpdateTableModel] = None, var tableInfoOp: Option[TableInfo] = None, - internalOptions: Map[String, String] = Map.empty, - partition: Map[String, Option[String]] = Map.empty) extends DataCommand { + var internalOptions: Map[String, String] = Map.empty, + partition: Map[String, Option[String]] = Map.empty, + logicalPlan: Option[LogicalPlan] = None, + var operationContext: OperationContext = new OperationContext) extends AtomicRunnableCommand { + var table: CarbonTable = _ + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) + table = if (tableInfoOp.isDefined) { + CarbonTable.buildFromTableInfo(tableInfoOp.get) + } else { + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + if (relation == null) { + throw new NoSuchTableException(dbName, tableName) + } + if (null == relation.carbonTable) { + LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName") + LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName") + throw new NoSuchTableException(dbName, tableName) + } + relation.carbonTable + } + operationContext.setProperty("isOverwrite", isOverwriteTable) + if(CarbonUtil.hasAggregationDataMap(table)) { + val loadMetadataEvent = new LoadMetadataEvent(table, false) + OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext) + } + Seq.empty + } override def processData(sparkSession: SparkSession): Seq[Row] = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val carbonProperty: CarbonProperties = CarbonProperties.getInstance() @@ -121,22 +147,6 @@ case class CarbonLoadDataCommand( val hadoopConf = sparkSession.sessionState.newHadoopConf() val carbonLoadModel = new CarbonLoadModel() try { - val table = if (tableInfoOp.isDefined) { - CarbonTable.buildFromTableInfo(tableInfoOp.get) - } else { - val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] - if (relation == null) { - throw new NoSuchTableException(dbName, tableName) - } - if (null == relation.carbonTable) { - LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName") - LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName") - throw new NoSuchTableException(dbName, tableName) - } - relation.carbonTable - } - val tableProperties = table.getTableInfo.getFactTable.getTableProperties val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options) optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope", @@ -167,7 +177,6 @@ case class CarbonLoadDataCommand( TableProcessingOperations.deletePartialLoadDataIfExist(table, false) var isUpdateTableStatusRequired = false try { - val operationContext = new OperationContext val loadTablePreExecutionEvent: LoadTablePreExecutionEvent = new LoadTablePreExecutionEvent( table.getCarbonTableIdentifier, @@ -181,7 +190,6 @@ case class CarbonLoadDataCommand( OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext) // First system has to partition the data first and then call the load data LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") - GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata // Clean up the old invalid segment data before creating a new entry for new load. DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table) // add the start entry for the new load in the table status file @@ -525,6 +533,7 @@ case class CarbonLoadDataCommand( CarbonSession.threadSet( CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, isEmptyBadRecord) + CarbonSession.threadSet("partition.operationcontext", operationContext) try { val query: LogicalPlan = if (dataFrame.isDefined) { val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1 @@ -649,6 +658,7 @@ case class CarbonLoadDataCommand( CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT) CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION) CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD) + CarbonSession.threadUnset("partition.operationcontext") } try { // Trigger auto compaction @@ -718,7 +728,6 @@ case class CarbonLoadDataCommand( val dataSchema = StructType(metastoreSchema .filterNot(field => partitionSchema.contains(field.name))) - val operationContextStr = ObjectSerializationUtil.convertObjectToString(operationContext) val options = new mutable.HashMap[String, String]() options ++= catalogTable.storage.properties options += (("overwrite", overWriteLocal.toString)) @@ -731,7 +740,6 @@ case class CarbonLoadDataCommand( partition.map{case (col, value) => (col.toLowerCase, value.isDefined)}.asJava)) options += (("staticpartition", staticPartitionStr)) } - options += (("operationcontext", operationContextStr)) options ++= this.options if (updateModel.isDefined) { options += (("updatetimestamp", updateModel.get.updatedTimeStamp.toString)) @@ -869,41 +877,4 @@ case class CarbonLoadDataCommand( val dataFrameWithTupleId = dataFrame.get.select(fieldWithTupleId: _*) (dataFrameWithTupleId) } - - private def updateTableMetadata( - carbonLoadModel: CarbonLoadModel, - sqlContext: SQLContext, - model: DictionaryLoadModel, - noDictDimension: Array[CarbonDimension]): Unit = { - val sparkSession = sqlContext.sparkSession - val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.table) - - val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore - // read TableInfo - val tableInfo: format.TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) - - // modify TableInfo - val columns = tableInfo.getFact_table.getTable_columns - for (i <- 0 until columns.size) { - if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) { - columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY) - } - } - val entry = tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) - entry.setTime_stamp(System.currentTimeMillis()) - - // write TableInfo - metastore.updateTableSchemaForAlter(carbonTablePath.getCarbonTableIdentifier, - carbonTablePath.getCarbonTableIdentifier, - tableInfo, entry, carbonTablePath.getPath)(sparkSession) - - // update the schema modified time - metastore.updateAndTouchSchemasUpdatedTime() - - val identifier = model.table.getCarbonTableIdentifier - // update CarbonDataLoadSchema - val carbonTable = metastore.lookupRelation(Option(identifier.getDatabaseName), - identifier.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].carbonTable - carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index 56f298a..c5340c2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -24,13 +24,13 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand +import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema -import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable} import org.apache.carbondata.core.statusmanager.SegmentStatusManager /** @@ -51,6 +51,7 @@ case class CreatePreAggregateTableCommand( extends AtomicRunnableCommand { var parentTable: CarbonTable = _ + var loadCommand: CarbonLoadDataCommand = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString) @@ -64,8 +65,6 @@ case class CreatePreAggregateTableCommand( parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan) assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table), "Parent table name is different in select and create") - - var neworder = Seq[String]() val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala parentOrder.foreach(parentcol => @@ -80,7 +79,9 @@ case class CreatePreAggregateTableCommand( .LOAD_SORT_SCOPE_DEFAULT)) tableProperties .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString) - + val tableIdentifier = + TableIdentifier(parentTableIdentifier.table + "_" + dataMapName, + parentTableIdentifier.database) // prepare table model of the collected tokens val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel( ifNotExistPresent = false, @@ -137,7 +138,30 @@ case class CreatePreAggregateTableCommand( // to be used in further create process. parentTable = CarbonEnv.getCarbonTable(parentTableIdentifier.database, parentTableIdentifier.table)(sparkSession) - + val updatedLoadQuery = if (timeSeriesFunction.isDefined) { + val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala + .filter(p => p.getDataMapName + .equalsIgnoreCase(dataMapName)).head + .asInstanceOf[AggregationDataMapSchema] + PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema, + parentTable.getTableName, + parentTable.getDatabaseName) + } + else { + queryString + } + val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction( + updatedLoadQuery)).drop("preAggLoad") + val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala + .filter(dataMap => dataMap.getDataMapName.equalsIgnoreCase(dataMapName)).head + .asInstanceOf[AggregationDataMapSchema] + loadCommand = PreAggregateUtil.createLoadCommandForChild( + dataMap.getChildSchema.getListOfColumns, + tableIdentifier, + dataFrame, + false, + sparkSession = sparkSession) + loadCommand.processMetadata(sparkSession) Seq.empty } @@ -159,35 +183,19 @@ case class CreatePreAggregateTableCommand( val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetaDataFilepath) .nonEmpty if (loadAvailable) { - val updatedQuery = if (timeSeriesFunction.isDefined) { - val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala - .filter(p => p.getDataMapName - .equalsIgnoreCase(dataMapName)).head - .asInstanceOf[AggregationDataMapSchema] - PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema, - parentTable.getTableName, - parentTable.getDatabaseName) - } else { - queryString - } // Passing segmentToLoad as * because we want to load all the segments into the // pre-aggregate table even if the user has set some segments on the parent table. + loadCommand.dataFrame = Some(PreAggregateUtil + .getDataFrame(sparkSession, loadCommand.logicalPlan.get)) PreAggregateUtil.startDataLoadForDataMap( - parentTable, - tableIdentifier, - updatedQuery, - segmentToLoad = "*", - validateSegments = true, - isOverwrite = false, - sparkSession = sparkSession) + parentTable, + segmentToLoad = "*", + validateSegments = true, + sparkSession, + loadCommand) } Seq.empty } - - // Create the aggregation table name with parent table name prefix - private lazy val tableIdentifier = - TableIdentifier(parentTableIdentifier.table + "_" + dataMapName, parentTableIdentifier.database) - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index fce32ab..7b273ba 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -20,18 +20,153 @@ package org.apache.spark.sql.execution.command.preaaggregate import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.sql.CarbonEnv -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.AlterTableModel -import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand +import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonLoadDataCommand} +import org.apache.spark.sql.parser.CarbonSpark2SqlParser -import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.events._ -import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +/** + * below class will be used to create load command for compaction + * for all the pre agregate child data map + */ +object CompactionProcessMetaListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get + val tableEvent = event.asInstanceOf[LoadMetadataEvent] + val table = tableEvent.getCarbonTable + if (!table.isChildDataMap && CarbonUtil.hasAggregationDataMap(table)) { + val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala + .filter(_.isInstanceOf[AggregationDataMapSchema]) + .asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]] + for (dataMapSchema: AggregationDataMapSchema <- aggregationDataMapList) { + val childTableName = dataMapSchema.getRelationIdentifier.getTableName + val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName + // Creating a new query string to insert data into pre-aggregate table from that same table. + // For example: To compact preaggtable1 we can fire a query like insert into preaggtable1 + // select * from preaggtable1 + // The following code will generate the select query with a load UDF that will be used to + // apply DataLoadingRules + val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() + // adding the aggregation load UDF + .addPreAggLoadFunction( + // creating the select query on the bases on table schema + PreAggregateUtil.createChildSelectQuery( + dataMapSchema.getChildSchema, table.getDatabaseName))).drop("preAggLoad") + val loadCommand = PreAggregateUtil.createLoadCommandForChild( + dataMapSchema.getChildSchema.getListOfColumns, + TableIdentifier(childTableName, Some(childDatabaseName)), + childDataFrame, + false, + sparkSession) + loadCommand.processMetadata(sparkSession) + operationContext + .setProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction", loadCommand) + } + } else if (table.isChildDataMap) { + val childTableName = table.getTableName + val childDatabaseName = table.getDatabaseName + // Creating a new query string to insert data into pre-aggregate table from that same table. + // For example: To compact preaggtable1 we can fire a query like insert into preaggtable1 + // select * from preaggtable1 + // The following code will generate the select query with a load UDF that will be used to + // apply DataLoadingRules + val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() + // adding the aggregation load UDF + .addPreAggLoadFunction( + // creating the select query on the bases on table schema + PreAggregateUtil.createChildSelectQuery( + table.getTableInfo.getFactTable, table.getDatabaseName))).drop("preAggLoad") + val loadCommand = PreAggregateUtil.createLoadCommandForChild( + table.getTableInfo.getFactTable.getListOfColumns, + TableIdentifier(childTableName, Some(childDatabaseName)), + childDataFrame, + false, + sparkSession) + loadCommand.processMetadata(sparkSession) + operationContext.setProperty(table.getTableName + "_Compaction", loadCommand) + } + } +} + +/** + * Below class to is to create LoadCommand for loading the + * the data of pre aggregate data map + */ +object LoadProcessMetaListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override protected def onEvent(event: Event, + operationContext: OperationContext): Unit = { + val sparkSession = SparkSession.getActiveSession.get + val tableEvent = event.asInstanceOf[LoadMetadataEvent] + if (!tableEvent.isCompaction) { + val table = tableEvent.getCarbonTable + if (CarbonUtil.hasAggregationDataMap(table)) { + // getting all the aggergate datamap schema + val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala + .filter(_.isInstanceOf[AggregationDataMapSchema]) + .asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]] + // sorting the datamap for timeseries rollup + val sortedList = aggregationDataMapList.sortBy(_.getOrdinal) + val parentTableName = table.getTableName + val databaseName = table.getDatabaseName + val list = scala.collection.mutable.ListBuffer.empty[AggregationDataMapSchema] + for (dataMapSchema: AggregationDataMapSchema <- sortedList) { + val childTableName = dataMapSchema.getRelationIdentifier.getTableName + val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName + val childSelectQuery = if (!dataMapSchema.isTimeseriesDataMap) { + PreAggregateUtil.getChildQuery(dataMapSchema) + } else { + // for timeseries rollup policy + val tableSelectedForRollup = PreAggregateUtil.getRollupDataMapNameForTimeSeries(list, + dataMapSchema) + list += dataMapSchema + // if non of the rollup data map is selected hit the maintable and prepare query + if (tableSelectedForRollup.isEmpty) { + PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMapSchema.getChildSchema, + parentTableName, + databaseName) + } else { + // otherwise hit the select rollup datamap schema + PreAggregateUtil.createTimeseriesSelectQueryForRollup(dataMapSchema.getChildSchema, + tableSelectedForRollup.get, + databaseName) + } + } + val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction( + childSelectQuery)).drop("preAggLoad") + val isOverwrite = + operationContext.getProperty("isOverwrite").asInstanceOf[Boolean] + val loadCommand = PreAggregateUtil.createLoadCommandForChild( + dataMapSchema.getChildSchema.getListOfColumns, + TableIdentifier(childTableName, Some(childDatabaseName)), + childDataFrame, + isOverwrite, + sparkSession) + loadCommand.processMetadata(sparkSession) + operationContext.setProperty(dataMapSchema.getChildSchema.getTableName, loadCommand) + } + } + } + } +} object LoadPostAggregateListener extends OperationEventListener { /** * Called on a specified event occurrence @@ -42,8 +177,7 @@ object LoadPostAggregateListener extends OperationEventListener { val loadEvent = event.asInstanceOf[LoadTablePreStatusUpdateEvent] val sparkSession = SparkSession.getActiveSession.get val carbonLoadModel = loadEvent.getCarbonLoadModel - val table = CarbonEnv.getCarbonTable(Option(carbonLoadModel.getDatabaseName), - carbonLoadModel.getTableName)(sparkSession) + val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable if (CarbonUtil.hasAggregationDataMap(table)) { // getting all the aggergate datamap schema val aggregationDataMapList = table.getTableInfo.getDataMapSchemaList.asScala @@ -51,41 +185,28 @@ object LoadPostAggregateListener extends OperationEventListener { .asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]] // sorting the datamap for timeseries rollup val sortedList = aggregationDataMapList.sortBy(_.getOrdinal) - val parentTableName = table.getTableName - val databasename = table.getDatabaseName - val list = scala.collection.mutable.ListBuffer.empty[AggregationDataMapSchema] for (dataMapSchema: AggregationDataMapSchema <- sortedList) { - val childTableName = dataMapSchema.getRelationIdentifier.getTableName - val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName - val childSelectQuery = if (!dataMapSchema.isTimeseriesDataMap) { - PreAggregateUtil.getChildQuery(dataMapSchema) - } else { - // for timeseries rollup policy - val tableSelectedForRollup = PreAggregateUtil.getRollupDataMapNameForTimeSeries(list, - dataMapSchema) - list += dataMapSchema - // if non of the rollup data map is selected hit the maintable and prepare query - if (tableSelectedForRollup.isEmpty) { - PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMapSchema.getChildSchema, - parentTableName, - databasename) - } else { - // otherwise hit the select rollup datamap schema - PreAggregateUtil.createTimeseriesSelectQueryForRollup(dataMapSchema.getChildSchema, - tableSelectedForRollup.get, - databasename) - } - } + val childLoadCommand = operationContext + .getProperty(dataMapSchema.getChildSchema.getTableName) + .asInstanceOf[CarbonLoadDataCommand] + childLoadCommand.dataFrame = Some(PreAggregateUtil + .getDataFrame(sparkSession, childLoadCommand.logicalPlan.get)) + val childOperationContext = new OperationContext + childOperationContext + .setProperty(dataMapSchema.getChildSchema.getTableName, + operationContext.getProperty(dataMapSchema.getChildSchema.getTableName)) val isOverwrite = operationContext.getProperty("isOverwrite").asInstanceOf[Boolean] + childOperationContext.setProperty("isOverwrite", isOverwrite) + childOperationContext.setProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction", + operationContext.getProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction")) + childLoadCommand.operationContext = childOperationContext PreAggregateUtil.startDataLoadForDataMap( table, - TableIdentifier(childTableName, Some(childDatabaseName)), - childSelectQuery, carbonLoadModel.getSegmentId, validateSegments = false, - isOverwrite, - sparkSession) + sparkSession, + childLoadCommand) } } } @@ -115,7 +236,8 @@ object AlterPreAggregateTableCompactionPostListener extends OperationEventListen compactionType.toString, Some(System.currentTimeMillis()), "") - CarbonAlterTableCompactionCommand(alterTableModel).run(sparkSession) + CarbonAlterTableCompactionCommand(alterTableModel, operationContext = operationContext) + .run(sparkSession) } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index cd19e3b..dac5d5e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.preaaggregate import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.JavaConverters._ -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, SparkSession} +import org.apache.spark.sql._ import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias, MatchCastExpression} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation} @@ -31,13 +31,14 @@ import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.parser.CarbonSpark2SqlParser -import org.apache.spark.sql.types.{DataType, LongType} +import org.apache.spark.sql.types.DataType import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema, TableSchema} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.format.TableInfo @@ -576,18 +577,15 @@ object PreAggregateUtil { } updatedPlan } - - /** + /** * This method will start load process on the data map */ def startDataLoadForDataMap( parentCarbonTable: CarbonTable, - dataMapIdentifier: TableIdentifier, - queryString: String, segmentToLoad: String, validateSegments: Boolean, - isOverwrite: Boolean, - sparkSession: SparkSession): Unit = { + sparkSession: SparkSession, + loadCommand: CarbonLoadDataCommand): Unit = { CarbonSession.threadSet( CarbonCommonConstants.CARBON_INPUT_SEGMENTS + parentCarbonTable.getDatabaseName + "." + @@ -597,32 +595,9 @@ object PreAggregateUtil { CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + parentCarbonTable.getDatabaseName + "." + parentCarbonTable.getTableName, validateSegments.toString) - val dataMapSchemas = parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala - val headers = dataMapSchemas.find(_.getChildSchema.getTableName.equalsIgnoreCase( - dataMapIdentifier.table)) match { - case Some(dataMapSchema) => - val columns = dataMapSchema.getChildSchema.getListOfColumns.asScala - .filter{column => - !column.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)} - columns.sortBy(_.getSchemaOrdinal).map( - _.getColumnName).mkString(",") - case None => - throw new RuntimeException( - s"${ dataMapIdentifier.table} datamap not found in DataMapSchema list: ${ - dataMapSchemas.map(_.getChildSchema.getTableName).mkString("[", ",", "]")}") - } - val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction( - queryString)).drop("preAggLoad") + CarbonSession.updateSessionInfoToCurrentThread(sparkSession) try { - CarbonLoadDataCommand(dataMapIdentifier.database, - dataMapIdentifier.table, - null, - Nil, - Map("fileheader" -> headers), - isOverwriteTable = isOverwrite, - dataFrame = Some(dataFrame), - internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")). - run(sparkSession) + loadCommand.processData(sparkSession) } finally { CarbonSession.threadUnset( CarbonCommonConstants.CARBON_INPUT_SEGMENTS + @@ -647,11 +622,12 @@ object PreAggregateUtil { case _ => a.getAggFunction}}(${a.getColumnName})" } else { groupingExpressions += a.getColumnName + aggregateColumns+= a.getColumnName } } - s"select ${ groupingExpressions.mkString(",") },${ aggregateColumns.mkString(",") - } from $databaseName.${ tableSchema.getTableName } group by ${ - groupingExpressions.mkString(",") }" + s"select ${ aggregateColumns.mkString(",") } " + + s"from $databaseName.${ tableSchema.getTableName }" + + s" group by ${ groupingExpressions.mkString(",") }" } /** @@ -900,4 +876,32 @@ object PreAggregateUtil { aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY").replace("&", "=")), CarbonCommonConstants.DEFAULT_CHARSET) } + + /** + * This method will start load process on the data map + */ + def createLoadCommandForChild( + columns: java.util.List[ColumnSchema], + dataMapIdentifier: TableIdentifier, + dataFrame: DataFrame, + isOverwrite: Boolean, + sparkSession: SparkSession): CarbonLoadDataCommand = { + val headers = columns.asScala.filter { column => + !column.getColumnName.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE) + }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",") + val loadCommand = CarbonLoadDataCommand(dataMapIdentifier.database, + dataMapIdentifier.table, + null, + Nil, + Map("fileheader" -> headers), + isOverwriteTable = isOverwrite, + dataFrame = None, + internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"), + logicalPlan = Some(dataFrame.queryExecution.logical)) + loadCommand + } + + def getDataFrame(sparkSession: SparkSession, child: LogicalPlan): DataFrame = { + Dataset.ofRows(sparkSession, child) + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala index 26a8f6f..19c265d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala @@ -42,15 +42,12 @@ case class CarbonCreateTableAsSelectCommand( ifNotExistsSet: Boolean = false, tableLocation: Option[String] = None) extends AtomicRunnableCommand { - /** - * variable to be used for insert into command for checking whether the - * table is created newly or already existed - */ - var isTableCreated: Boolean = false + var loadCommand: CarbonInsertIntoCommand = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val tableName = tableInfo.getFactTable.getTableName + var isTableCreated = false var databaseOpt: Option[String] = None if (tableInfo.getDatabaseName != null) { databaseOpt = Some(tableInfo.getDatabaseName) @@ -71,10 +68,7 @@ case class CarbonCreateTableAsSelectCommand( CarbonCreateTableCommand(tableInfo, ifNotExistsSet, tableLocation).run(sparkSession) isTableCreated = true } - Seq.empty - } - override def processData(sparkSession: SparkSession): Seq[Row] = { if (isTableCreated) { val tableName = tableInfo.getFactTable.getTableName var databaseOpt: Option[String] = None @@ -87,12 +81,23 @@ case class CarbonCreateTableAsSelectCommand( .createCarbonDataSourceHadoopRelation(sparkSession, TableIdentifier(tableName, Option(dbName))) // execute command to load data into carbon table - CarbonInsertIntoCommand( + loadCommand = CarbonInsertIntoCommand( carbonDataSourceHadoopRelation, query, overwrite = false, - partition = Map.empty).run(sparkSession) - LOGGER.audit(s"CTAS operation completed successfully for $dbName.$tableName") + partition = Map.empty) + loadCommand.processMetadata(sparkSession) + } + Seq.empty + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + if (null != loadCommand) { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + loadCommand.processData(sparkSession) + val carbonTable = loadCommand.relation.carbonTable + LOGGER.audit(s"CTAS operation completed successfully for " + + s"${carbonTable.getDatabaseName}.${carbonTable.getTableName}") } Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index d74e461..99e5732 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -127,10 +127,6 @@ with Serializable { if (segemntsTobeDeleted.isDefined) { conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, segemntsTobeDeleted.get) } - val operationContextStr = options.get("operationcontext") - if (operationContextStr.isDefined) { - conf.set(CarbonTableOutputFormat.OPERATION_CONTEXT, operationContextStr.get) - } CarbonTableOutputFormat.setLoadModel(conf, model) new OutputWriterFactory { http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index f058e96..57be754 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -95,7 +95,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { ExecutedCommandExec(createDb) :: Nil case drop@DropDatabaseCommand(dbName, ifExists, isCascade) => ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil - case alterTable@CarbonAlterTableCompactionCommand(altertablemodel, _) => + case alterTable@CarbonAlterTableCompactionCommand(altertablemodel, _, _) => val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore .tableExists(TableIdentifier(altertablemodel.tableName, altertablemodel.dbName))(sparkSession) http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala index 87be2d2..b8608f4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala @@ -34,7 +34,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension} import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} /** * Represents logical plan for one carbon table @@ -204,7 +204,6 @@ case class CarbonRelation( def sizeInBytes: Long = { val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime( carbonTable.getAbsoluteTableIdentifier) - if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) { if (new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier) .getValidAndInvalidSegments.getValidSegments.isEmpty) { @@ -215,8 +214,19 @@ case class CarbonRelation( carbonTable.getCarbonTableIdentifier).getPath val fileType = FileFactory.getFileType(tablePath) if (FileFactory.isFileExist(tablePath, fileType)) { + // get the valid segments + val segments = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier) + .getValidAndInvalidSegments.getValidSegments.asScala + var size = 0L + // for each segment calculate the size + segments.foreach {validSeg => + size = size + FileFactory.getDirectorySize( + CarbonTablePath.getSegmentPath(tablePath, validSeg)) + } + // update the new table status time tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime - sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath) + // update the new size + sizeInBytesLocalValue = size } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a6136df/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java index b00a67e..78964e7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java @@ -20,6 +20,7 @@ package org.apache.carbondata.processing.loading.events; import java.util.Map; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.events.Event; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; @@ -129,6 +130,24 @@ public class LoadEvents { } /** + * Load Even class will be fired from the Load and compaction class + * to creating all the load commands for all preaggregate data map + */ + public static class LoadMetadataEvent extends Event { + private CarbonTable carbonTable; + private boolean isCompaction; + public LoadMetadataEvent(CarbonTable carbonTable, boolean isCompaction) { + this.carbonTable = carbonTable; + this.isCompaction = isCompaction; + } + public boolean isCompaction() { + return isCompaction; + } + public CarbonTable getCarbonTable() { + return carbonTable; + } + } + /** * Class for handling clean up in case of any failure and abort the operation. */
