http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala deleted file mode 100644 index cb35960..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala +++ /dev/null @@ -1,842 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.command - -import java.util - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ListBuffer - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.execution.RunnableCommand -import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.storage.StorageLevel - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum} -import org.apache.carbondata.core.mutate.data.RowCountDetailsVO -import org.apache.carbondata.core.statusmanager.{SegmentStatusManager, SegmentUpdateStatusManager} -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} -import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl -import org.apache.carbondata.processing.exception.MultipleMatchingException -import org.apache.carbondata.processing.loading.FailureCauses -import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType} -import org.apache.carbondata.spark.DeleteDelataResultImpl -import org.apache.carbondata.spark.util.QueryPlanUtil - - -/** - * IUD update delete and compaction framework. - * - */ - -private[sql] case class ProjectForDeleteCommand( - plan: LogicalPlan, - identifier: Seq[String], - timestamp: String) extends RunnableCommand { - - val LOG = LogServiceFactory.getLogService(this.getClass.getName) - var horizontalCompactionFailed = false - - override def run(sqlContext: SQLContext): Seq[Row] = { - - val dataFrame = DataFrame(sqlContext, plan) - val dataRdd = dataFrame.rdd - - val relation = CarbonEnv.get.carbonMetastore - .lookupRelation1(deleteExecution.getTableIdentifier(identifier))(sqlContext). - asInstanceOf[CarbonRelation] - val carbonTable = relation.tableMeta.carbonTable - val metadataLock = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, - LockUsage.METADATA_LOCK) - var lockStatus = false - try { - lockStatus = metadataLock.lockWithRetries() - LOG.audit(s" Delete data request has been received " + - s"for ${ relation.databaseName }.${ relation.tableName }.") - if (lockStatus) { - LOG.info("Successfully able to get the table metadata file lock") - } - else { - throw new Exception("Table is locked for deletion. Please try after some time") - } - val tablePath = CarbonStorePath.getCarbonTablePath( - carbonTable.getStorePath, - carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier) - var executorErrors = new ExecutionErrors(FailureCauses.NONE, "") - - // handle the clean up of IUD. - CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false) - - if (deleteExecution - .deleteDeltaExecution(identifier, sqlContext, dataRdd, timestamp, relation, - false, executorErrors)) { - // call IUD Compaction. - IUDCommon.tryHorizontalCompaction(sqlContext, relation, isUpdateOperation = false) - } - } catch { - case e: HorizontalCompactionException => - LOG.error("Delete operation passed. Exception in Horizontal Compaction." + - " Please check logs. " + e.getMessage) - CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString) - - case e: Exception => - LOG.error("Exception in Delete data operation " + e.getMessage) - // ****** start clean up. - // In case of failure , clean all related delete delta files - CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) - - // clean up. Null check is required as for executor error some times message is null - if (null != e.getMessage) { - sys.error("Delete data operation is failed. " + e.getMessage) - } - else { - sys.error("Delete data operation is failed. Please check logs.") - } - } finally { - if (lockStatus) { - CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK) - } - } - Seq.empty - } -} - -private[sql] case class ProjectForUpdateCommand( - plan: LogicalPlan, tableIdentifier: Seq[String]) extends RunnableCommand { - val LOGGER = LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName) - - override def run(sqlContext: SQLContext): Seq[Row] = { - - val res = plan find { - case relation: LogicalRelation if (relation.relation - .isInstanceOf[CarbonDatasourceRelation]) => - true - case _ => false - } - - if (!res.isDefined) { - return Seq.empty - } - - val relation = CarbonEnv.get.carbonMetastore - .lookupRelation1(deleteExecution.getTableIdentifier(tableIdentifier))(sqlContext). - asInstanceOf[CarbonRelation] - val carbonTable = relation.tableMeta.carbonTable - val metadataLock = CarbonLockFactory - .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, - LockUsage.METADATA_LOCK) - var lockStatus = false - // get the current time stamp which should be same for delete and update. - val currentTime = CarbonUpdateUtil.readCurrentTime - var dataFrame: DataFrame = null - val isPersistEnabledUserValue = CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.isPersistEnabled, - CarbonCommonConstants.defaultValueIsPersistEnabled) - var isPersistEnabled = CarbonCommonConstants.defaultValueIsPersistEnabled.toBoolean - if (isPersistEnabledUserValue.equalsIgnoreCase("false")) { - isPersistEnabled = false - } - else if (isPersistEnabledUserValue.equalsIgnoreCase("true")) { - isPersistEnabled = true - } - try { - lockStatus = metadataLock.lockWithRetries() - if (lockStatus) { - logInfo("Successfully able to get the table metadata file lock") - } - else { - throw new Exception("Table is locked for updation. Please try after some time") - } - val tablePath = CarbonStorePath.getCarbonTablePath( - carbonTable.getStorePath, - carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier) - // Get RDD. - dataFrame = if (isPersistEnabled) { - DataFrame(sqlContext, plan) - .persist(StorageLevel.MEMORY_AND_DISK) - } - else { - DataFrame(sqlContext, plan) - } - var executionErrors = new ExecutionErrors(FailureCauses.NONE, "") - - - // handle the clean up of IUD. - CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false) - - // do delete operation. - deleteExecution.deleteDeltaExecution(tableIdentifier, sqlContext, dataFrame.rdd, - currentTime + "", - relation, isUpdateOperation = true, executionErrors) - - if(executionErrors.failureCauses != FailureCauses.NONE) { - throw new Exception(executionErrors.errorMsg) - } - - // do update operation. - UpdateExecution.performUpdate(dataFrame, tableIdentifier, plan, - sqlContext, currentTime, executionErrors) - - if(executionErrors.failureCauses != FailureCauses.NONE) { - throw new Exception(executionErrors.errorMsg) - } - - // Do IUD Compaction. - IUDCommon.tryHorizontalCompaction(sqlContext, relation, isUpdateOperation = true) - } - - catch { - case e: HorizontalCompactionException => - LOGGER.error( - "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e) - // In case of failure , clean all related delta files - CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString) - - case e: Exception => - LOGGER.error("Exception in update operation" + e) - // ****** start clean up. - // In case of failure , clean all related delete delta files - CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "") - - // *****end clean up. - if (null != e.getMessage) { - sys.error("Update operation failed. " + e.getMessage) - } - if (null != e.getCause && null != e.getCause.getMessage) { - sys.error("Update operation failed. " + e.getCause.getMessage) - } - sys.error("Update operation failed. please check logs.") - } - finally { - if (null != dataFrame && isPersistEnabled) { - dataFrame.unpersist() - } - if (lockStatus) { - CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK) - } - } - Seq.empty - } -} - -object IUDCommon { - - val LOG = LogServiceFactory.getLogService(this.getClass.getName) - - /** - * The method does horizontal compaction. After Update and Delete completion - * tryHorizontal compaction will be called. In case this method is called after - * Update statement then Update Compaction followed by Delete Compaction will be - * processed whereas for tryHorizontalCompaction called after Delete statement - * then only Delete Compaction will be processed. - * - * @param sqlContext - * @param carbonRelation - * @param isUpdateOperation - */ - def tryHorizontalCompaction(sqlContext: SQLContext, - carbonRelation: CarbonRelation, - isUpdateOperation: Boolean): Unit = { - - var ishorizontalCompaction = CarbonDataMergerUtil.isHorizontalCompactionEnabled() - - if (ishorizontalCompaction == false) { - return - } - - var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION - val carbonTable = carbonRelation.tableMeta.carbonTable - val (db, table) = (carbonTable.getDatabaseName, carbonTable.getFactTableName) - val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier - val updateTimeStamp = System.currentTimeMillis() - // To make sure that update and delete timestamps are not same, - // required to commit to status metadata and cleanup - val deleteTimeStamp = updateTimeStamp + 1 - - // get the valid segments - var segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier) - - if (segLists == null || segLists.size() == 0) { - return - } - - // Should avoid reading Table Status file from Disk every time. Better to load it - // in-memory at the starting and pass it along the routines. The constructor of - // SegmentUpdateStatusManager reads the Table Status File and Table Update Status - // file and save the content in segmentDetails and updateDetails respectively. - val segmentUpdateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager( - absTableIdentifier) - - if (isUpdateOperation == true) { - - // This is only update operation, perform only update compaction. - compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION - performUpdateDeltaCompaction(sqlContext, - compactionTypeIUD, - carbonTable, - absTableIdentifier, - segmentUpdateStatusManager, - updateTimeStamp, - segLists) - } - - // After Update Compaction perform delete compaction - compactionTypeIUD = CompactionType.IUD_DELETE_DELTA_COMPACTION - segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier) - if (segLists == null || segLists.size() == 0) { - return - } - - // Delete Compaction - performDeleteDeltaCompaction(sqlContext, - compactionTypeIUD, - carbonTable, - absTableIdentifier, - segmentUpdateStatusManager, - deleteTimeStamp, - segLists) - } - - /** - * Update Delta Horizontal Compaction. - * - * @param sqlContext - * @param compactionTypeIUD - * @param carbonTable - * @param absTableIdentifier - * @param segLists - */ - private def performUpdateDeltaCompaction(sqlContext: SQLContext, - compactionTypeIUD: CompactionType, - carbonTable: CarbonTable, - absTableIdentifier: AbsoluteTableIdentifier, - segmentUpdateStatusManager: SegmentUpdateStatusManager, - factTimeStamp: Long, - segLists: util.List[String]): Unit = { - val db = carbonTable.getDatabaseName - val table = carbonTable.getFactTableName - // get the valid segments qualified for update compaction. - val validSegList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists, - absTableIdentifier, - segmentUpdateStatusManager, - compactionTypeIUD) - - if (validSegList.size() == 0) { - return - } - - LOG.info(s"Horizontal Update Compaction operation started for [${db}.${table}].") - LOG.audit(s"Horizontal Update Compaction operation started for [${db}.${table}].") - - try { - // Update Compaction. - val altertablemodel = AlterTableModel(Option(carbonTable.getDatabaseName), - carbonTable.getFactTableName, - Some(segmentUpdateStatusManager), - CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString, - Some(factTimeStamp), - "") - - AlterTableCompaction(altertablemodel).run(sqlContext) - } - catch { - case e: Exception => - val msg = if (null != e.getMessage) { - e.getMessage - } else { - "Please check logs for more info" - } - throw new HorizontalCompactionException( - s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp) - } - LOG.info(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].") - LOG.audit(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].") - } - - /** - * Delete Delta Horizontal Compaction. - * - * @param sqlContext - * @param compactionTypeIUD - * @param carbonTable - * @param absTableIdentifier - * @param segLists - */ - private def performDeleteDeltaCompaction(sqlContext: SQLContext, - compactionTypeIUD: CompactionType, - carbonTable: CarbonTable, - absTableIdentifier: AbsoluteTableIdentifier, - segmentUpdateStatusManager: SegmentUpdateStatusManager, - factTimeStamp: Long, - segLists: util.List[String]): Unit = { - - val db = carbonTable.getDatabaseName - val table = carbonTable.getFactTableName - val deletedBlocksList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists, - absTableIdentifier, - segmentUpdateStatusManager, - compactionTypeIUD) - - if (deletedBlocksList.size() == 0) { - return - } - - LOG.info(s"Horizontal Delete Compaction operation started for [${db}.${table}].") - LOG.audit(s"Horizontal Delete Compaction operation started for [${db}.${table}].") - - try { - - // Delete Compaction RDD - val rdd1 = sqlContext.sparkContext - .parallelize(deletedBlocksList.asScala.toSeq, deletedBlocksList.size()) - - val timestamp = factTimeStamp - val updateStatusDetails = segmentUpdateStatusManager.getUpdateStatusDetails - val result = rdd1.mapPartitions(iter => - new Iterator[Seq[CarbonDataMergerUtilResult]] { - override def hasNext: Boolean = iter.hasNext - - override def next(): Seq[CarbonDataMergerUtilResult] = { - val segmentAndBlocks = iter.next - val segment = segmentAndBlocks.substring(0, segmentAndBlocks.lastIndexOf("/")) - val blockName = segmentAndBlocks - .substring(segmentAndBlocks.lastIndexOf("/") + 1, segmentAndBlocks.length) - - val result = CarbonDataMergerUtil.compactBlockDeleteDeltaFiles(segment, blockName, - absTableIdentifier, - updateStatusDetails, - timestamp) - - result.asScala.toList - - } - }).collect - - val resultList = ListBuffer[CarbonDataMergerUtilResult]() - result.foreach(x => { - x.foreach(y => { - resultList += y - }) - }) - - val updateStatus = CarbonDataMergerUtil.updateStatusFile(resultList.toList.asJava, - carbonTable, - timestamp.toString, - segmentUpdateStatusManager) - if (updateStatus == false) { - LOG.audit(s"Delete Compaction data operation is failed for [${db}.${table}].") - LOG.error("Delete Compaction data operation is failed.") - throw new HorizontalCompactionException( - s"Horizontal Delete Compaction Failed for [${db}.${table}] ." + - s" Please check logs for more info.", factTimeStamp) - } - else { - LOG.info(s"Horizontal Delete Compaction operation completed for [${db}.${table}].") - LOG.audit(s"Horizontal Delete Compaction operation completed for [${db}.${table}].") - } - } - catch { - case e: Exception => - val msg = if (null != e.getMessage) { - e.getMessage - } else { - "Please check logs for more info" - } - throw new HorizontalCompactionException( - s"Horizontal Delete Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp) - } - } -} - -class HorizontalCompactionException( - message: String, - // required for cleanup - val compactionTimeStamp: Long) extends RuntimeException(message) { -} - -object deleteExecution { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - - def getTableIdentifier(tableIdentifier: Seq[String]): TableIdentifier = { - if (tableIdentifier.size > 1) { - TableIdentifier(tableIdentifier(1), Some(tableIdentifier(0))) - } else { - TableIdentifier(tableIdentifier(0), None) - } - } - - def deleteDeltaExecution(identifier: Seq[String], - sqlContext: SQLContext, - dataRdd: RDD[Row], - timestamp: String, relation: CarbonRelation, isUpdateOperation: Boolean, - executorErrors: ExecutionErrors): Boolean = { - - var res: Array[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] = null - val tableName = getTableIdentifier(identifier).table - val database = getDB.getDatabaseName(getTableIdentifier(identifier).database, sqlContext) - val relation = CarbonEnv.get.carbonMetastore - .lookupRelation1(getTableIdentifier(identifier))(sqlContext). - asInstanceOf[CarbonRelation] - - val storeLocation = relation.tableMeta.storePath - val absoluteTableIdentifier: AbsoluteTableIdentifier = new - AbsoluteTableIdentifier(storeLocation, - relation.tableMeta.carbonTableIdentifier) - var tablePath = CarbonStorePath - .getCarbonTablePath(storeLocation, - absoluteTableIdentifier.getCarbonTableIdentifier()) - var tableUpdateStatusPath = tablePath.getTableUpdateStatusFilePath - val totalSegments = - SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath).length - var factPath = tablePath.getFactDir - - var carbonTable = relation.tableMeta.carbonTable - var deleteStatus = true - val deleteRdd = if (isUpdateOperation) { - val schema = - org.apache.spark.sql.types.StructType(Seq(org.apache.spark.sql.types.StructField( - CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, - org.apache.spark.sql.types.StringType))) - val rdd = dataRdd - .map(row => Row(row.get(row.fieldIndex( - CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)))) - sqlContext.createDataFrame(rdd, schema).rdd - } else { - dataRdd - } - - val (carbonInputFormat, job) = - QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier) - - val keyRdd = deleteRdd.map({ row => - val tupleId: String = row - .getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) - val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId) - (key, row) - }).groupByKey() - - // if no loads are present then no need to do anything. - if (keyRdd.partitions.size == 0) { - return true - } - - var blockMappingVO = carbonInputFormat.getBlockRowCount(job, absoluteTableIdentifier) - val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(absoluteTableIdentifier) - CarbonUpdateUtil - .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr) - - val rowContRdd = sqlContext.sparkContext - .parallelize(blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq, - keyRdd.partitions.size) - - val rdd = rowContRdd.join(keyRdd) - - res = rdd.mapPartitionsWithIndex( - (index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) => - Iterator[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] { - - var result = List[(String, (SegmentUpdateDetails, ExecutionErrors))]() - while (records.hasNext) { - val ((key), (rowCountDetailsVO, groupedRows)) = records.next - result = result ++ - deleteDeltaFunc(index, - key, - groupedRows.toIterator, - timestamp, - rowCountDetailsVO) - - } - result - } - ).collect() - - // if no loads are present then no need to do anything. - if (res.isEmpty) { - return true - } - - // update new status file - checkAndUpdateStatusFiles - - // all or none : update status file, only if complete delete opeartion is successfull. - def checkAndUpdateStatusFiles: Unit = { - val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]() - val segmentDetails = new util.HashSet[String]() - res.foreach(resultOfSeg => resultOfSeg.foreach( - resultOfBlock => { - if (resultOfBlock._1.equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)) { - blockUpdateDetailsList.add(resultOfBlock._2._1) - segmentDetails.add(resultOfBlock._2._1.getSegmentName) - // if this block is invalid then decrement block count in map. - if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getStatus)) { - CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1, - blockMappingVO.getSegmentNumberOfBlockMapping) - } - } - else { - deleteStatus = false - // In case of failure , clean all related delete delta files - CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) - LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }") - val errorMsg = - "Delete data operation is failed due to failure in creating delete delta file for " + - "segment : " + resultOfBlock._2._1.getSegmentName + " block : " + - resultOfBlock._2._1.getBlockName - executorErrors.failureCauses = resultOfBlock._2._2.failureCauses - executorErrors.errorMsg = resultOfBlock._2._2.errorMsg - - if (executorErrors.failureCauses == FailureCauses.NONE) { - executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE - executorErrors.errorMsg = errorMsg - } - LOGGER.error(errorMsg) - return - } - } - ) - ) - - val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil - .getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping) - - - - // this is delete flow so no need of putting timestamp in the status file. - if (CarbonUpdateUtil - .updateSegmentStatus(blockUpdateDetailsList, carbonTable, timestamp, false) && - CarbonUpdateUtil - .updateTableMetadataStatus(segmentDetails, - carbonTable, - timestamp, - !isUpdateOperation, - listOfSegmentToBeMarkedDeleted) - ) { - LOGGER.info(s"Delete data operation is successful for ${ database }.${ tableName }") - LOGGER.audit(s"Delete data operation is successful for ${ database }.${ tableName }") - } - else { - // In case of failure , clean all related delete delta files - CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) - - val errorMessage = "Delete data operation is failed due to failure " + - "in table status updation." - LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }") - LOGGER.error("Delete data operation is failed due to failure in table status updation.") - executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE - executorErrors.errorMsg = errorMessage - // throw new Exception(errorMessage) - } - } - - def deleteDeltaFunc(index: Int, - key: String, - iter: Iterator[Row], - timestamp: String, - rowCountDetailsVO: RowCountDetailsVO): - Iterator[(String, (SegmentUpdateDetails, ExecutionErrors))] = { - - val result = new DeleteDelataResultImpl() - var deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - // here key = segment/blockName - val blockName = CarbonUpdateUtil - .getBlockName( - CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1))) - val segmentId = key.split(CarbonCommonConstants.FILE_SEPARATOR)(0) - var deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new DeleteDeltaBlockDetails(blockName) - val resultIter = new Iterator[(String, (SegmentUpdateDetails, ExecutionErrors))] { - val segmentUpdateDetails = new SegmentUpdateDetails() - var TID = "" - var countOfRows = 0 - try { - while (iter.hasNext) { - val oneRow = iter.next - TID = oneRow - .get(oneRow.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).toString - val offset = CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.OFFSET) - val blockletId = CarbonUpdateUtil - .getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID) - val pageId = Integer.parseInt(CarbonUpdateUtil - .getRequiredFieldFromTID(TID, TupleIdEnum.PAGE_ID)) - val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset, pageId) - // stop delete operation - if(!IsValidOffset) { - executorErrors.failureCauses = FailureCauses.MULTIPLE_INPUT_ROWS_MATCHING - executorErrors.errorMsg = "Multiple input rows matched for same row." - throw new MultipleMatchingException("Multiple input rows matched for same row.") - } - countOfRows = countOfRows + 1 - } - - val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath) - val completeBlockName = CarbonTablePath - .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) + - CarbonCommonConstants.FACT_FILE_EXT) - val deleteDeletaPath = CarbonUpdateUtil - .getDeleteDeltaFilePath(blockPath, blockName, timestamp) - val carbonDeleteWriter = new CarbonDeleteDeltaWriterImpl(deleteDeletaPath, - FileFactory.getFileType(deleteDeletaPath)) - - - - segmentUpdateDetails.setBlockName(blockName) - segmentUpdateDetails.setActualBlockName(completeBlockName) - segmentUpdateDetails.setSegmentName(segmentId) - segmentUpdateDetails.setDeleteDeltaEndTimestamp(timestamp) - segmentUpdateDetails.setDeleteDeltaStartTimestamp(timestamp) - - val alreadyDeletedRows: Long = rowCountDetailsVO.getDeletedRowsInBlock - val totalDeletedRows: Long = alreadyDeletedRows + countOfRows - segmentUpdateDetails.setDeletedRowsInBlock(totalDeletedRows.toString) - if (totalDeletedRows == rowCountDetailsVO.getTotalNumberOfRows) { - segmentUpdateDetails.setStatus(CarbonCommonConstants.MARKED_FOR_DELETE) - } - else { - // write the delta file - carbonDeleteWriter.write(deleteDeltaBlockDetails) - } - - deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS - } catch { - case e : MultipleMatchingException => - LOGGER.audit(e.getMessage) - LOGGER.error(e.getMessage) - // dont throw exception here. - case e: Exception => - val errorMsg = s"Delete data operation is failed for ${ database }.${ tableName }." - LOGGER.audit(errorMsg) - LOGGER.error(errorMsg + e.getMessage) - throw e - } - - - var finished = false - - override def hasNext: Boolean = { - if (!finished) { - finished = true - finished - } - else { - !finished - } - } - - override def next(): (String, (SegmentUpdateDetails, ExecutionErrors)) = { - finished = true - result.getKey(deleteStatus, (segmentUpdateDetails, executorErrors)) - } - } - resultIter - } - true - } -} - - - -object UpdateExecution { - - def performUpdate( - dataFrame: DataFrame, - tableIdentifier: Seq[String], - plan: LogicalPlan, - sqlContext: SQLContext, - currentTime: Long, - executorErrors: ExecutionErrors): Unit = { - - def isDestinationRelation(relation: CarbonDatasourceRelation): Boolean = { - - val tableName = relation.getTable() - val dbName = relation.getDatabaseName() - (tableIdentifier.size > 1 && - tableIdentifier(0) == dbName && - tableIdentifier(1) == tableName) || - (tableIdentifier(0) == tableName) - } - def getHeader(relation: CarbonDatasourceRelation, plan: LogicalPlan): String = { - var header = "" - var found = false - - plan match { - case Project(pList, _) if (!found) => - found = true - header = pList - .filter(field => !field.name - .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) - .map(col => if (col.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION)) { - col.name - .substring(0, col.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)) - } - else { - col.name - }).mkString(",") - } - header - } - val ex = dataFrame.queryExecution.analyzed - val res = ex find { - case relation: LogicalRelation if (relation.relation.isInstanceOf[CarbonDatasourceRelation] && - isDestinationRelation(relation.relation - .asInstanceOf[CarbonDatasourceRelation])) => - true - case _ => false - } - val carbonRelation: CarbonDatasourceRelation = res match { - case Some(relation: LogicalRelation) => - relation.relation.asInstanceOf[CarbonDatasourceRelation] - case _ => sys.error("") - } - - val updateTableModel = UpdateTableModel(true, currentTime, executorErrors) - - val header = getHeader(carbonRelation, plan) - - - - LoadTable( - Some(carbonRelation.getDatabaseName()), - carbonRelation.getTable(), - null, - Seq(), - Map(("fileheader" -> header)), - false, - null, - Some(dataFrame), - Some(updateTableModel)).run(sqlContext) - - - executorErrors.errorMsg = updateTableModel.executorErrors.errorMsg - executorErrors.failureCauses = updateTableModel.executorErrors.failureCauses - - Seq.empty - - } - -}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala deleted file mode 100644 index 9814cc2..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ /dev/null @@ -1,1019 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.command - -import java.io.File - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ListBuffer -import scala.language.implicitConversions - -import org.apache.commons.lang3.StringUtils -import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan} -import org.apache.spark.sql.hive.CarbonMetastore -import org.apache.spark.sql.types.TimestampType -import org.apache.spark.util.{CausedBy, FileUtils} -import org.codehaus.jackson.map.ObjectMapper - -import org.apache.carbondata.api.CarbonStore -import org.apache.carbondata.common.constants.LoggerAction -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.dictionary.server.DictionaryServer -import org.apache.carbondata.core.exception.InvalidConfigurationException -import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier} -import org.apache.carbondata.core.metadata.encoder.Encoding -import org.apache.carbondata.core.metadata.schema.table.TableInfo -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension -import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} -import org.apache.carbondata.core.util.path.CarbonStorePath -import org.apache.carbondata.processing.exception.DataLoadingException -import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants -import org.apache.carbondata.processing.loading.exception.NoRetryException -import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} -import org.apache.carbondata.processing.util.TableOptionConstant -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException -import org.apache.carbondata.spark.load.ValidateUtil -import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel} -import org.apache.carbondata.spark.util.{CommonUtil, GlobalDictionaryUtil} - -object Checker { - def validateTableExists( - dbName: Option[String], - tableName: String, - sqlContext: SQLContext): Unit = { - val identifier = TableIdentifier(tableName, dbName) - if (!CarbonEnv.get.carbonMetastore.tableExists(identifier)(sqlContext)) { - val err = s"table $dbName.$tableName not found" - LogServiceFactory.getLogService(this.getClass.getName).error(err) - throw new IllegalArgumentException(err) - } - } -} - -/** - * Command for show table partitions Command - * - * @param tableIdentifier - */ -private[sql] case class ShowCarbonPartitionsCommand( - tableIdentifier: TableIdentifier) extends RunnableCommand { - val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName) - override val output = CommonUtil.partitionInfoOutput - override def run(sqlContext: SQLContext): Seq[Row] = { - val relation = CarbonEnv.get.carbonMetastore - .lookupRelation1(tableIdentifier)(sqlContext). - asInstanceOf[CarbonRelation] - val carbonTable = relation.tableMeta.carbonTable - var tableName = carbonTable.getFactTableName - var partitionInfo = carbonTable.getPartitionInfo( - carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName) - if (partitionInfo == null) { - throw new AnalysisException( - s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName") - } - var partitionType = partitionInfo.getPartitionType - var columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName - LOGGER.info("partition column name:" + columnName) - CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo) - } -} - -/** - * Command for the compaction in alter table command - * - * @param alterTableModel - */ -private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) extends - RunnableCommand { - - def run(sqlContext: SQLContext): Seq[Row] = { - // TODO : Implement it. - val tableName = alterTableModel.tableName - val databaseName = getDB.getDatabaseName(alterTableModel.dbName, sqlContext) - if (null == CarbonMetadata.getInstance.getCarbonTable(databaseName + "_" + tableName)) { - logError(s"alter table failed. table not found: $databaseName.$tableName") - sys.error(s"alter table failed. table not found: $databaseName.$tableName") - } - - val relation = - CarbonEnv.get.carbonMetastore - .lookupRelation1(Option(databaseName), tableName)(sqlContext) - .asInstanceOf[CarbonRelation] - if (relation == null) { - sys.error(s"Table $databaseName.$tableName does not exist") - } - val carbonLoadModel = new CarbonLoadModel() - - - val table = relation.tableMeta.carbonTable - carbonLoadModel.setTableName(table.getFactTableName) - val dataLoadSchema = new CarbonDataLoadSchema(table) - // Need to fill dimension relation - carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) - carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName) - carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) - carbonLoadModel.setStorePath(relation.tableMeta.storePath) - - var storeLocation = CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH, - System.getProperty("java.io.tmpdir") - ) - storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() - try { - CarbonDataRDDFactory.alterTableForCompaction(sqlContext, - alterTableModel, - carbonLoadModel, - relation.tableMeta.storePath, - storeLocation - ) - } catch { - case e: Exception => - if (null != e.getMessage) { - sys.error(s"Compaction failed. Please check logs for more info. ${ e.getMessage }") - } else { - sys.error("Exception in compaction. Please check logs for more info.") - } - } - Seq.empty - } -} - -case class CreateTable(cm: TableModel) extends RunnableCommand { - - def run(sqlContext: SQLContext): Seq[Row] = { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sqlContext) - val tbName = cm.tableName - val dbName = cm.databaseName - LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]") - - val tableInfo: TableInfo = TableNewProcessor(cm) - - // Add validation for sort scope when create table - val sortScope = tableInfo.getFactTable.getTableProperties - .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) - if (!CarbonUtil.isValidSortOption(sortScope)) { - throw new InvalidConfigurationException(s"Passing invalid SORT_SCOPE '$sortScope'," + - s" valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT' ") - } - - if (tableInfo.getFactTable.getListOfColumns.isEmpty) { - sys.error("No Dimensions found. Table should have at least one dimesnion !") - } - - if (sqlContext.tableNames(dbName).exists(_.equalsIgnoreCase(tbName))) { - if (!cm.ifNotExistsSet) { - LOGGER.audit( - s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " + - s"Table [$tbName] already exists under database [$dbName]") - sys.error(s"Table [$tbName] already exists under database [$dbName]") - } - } else { - // Add Database to catalog and persist - val catalog = CarbonEnv.get.carbonMetastore - // Need to fill partitioner class when we support partition - val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName, null)(sqlContext) - try { - sqlContext.sql( - s"""CREATE TABLE $dbName.$tbName USING carbondata""" + - s""" OPTIONS (tableName "$dbName.$tbName", tablePath "$tablePath") """) - .collect - } catch { - case e: Exception => - val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName)) - // call the drop table to delete the created table. - - CarbonEnv.get.carbonMetastore - .dropTable(catalog.storePath, identifier)(sqlContext) - - LOGGER.audit(s"Table creation with Database name [$dbName] " + - s"and Table name [$tbName] failed") - throw e - } - - LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]") - } - - Seq.empty - } - - def setV(ref: Any, name: String, value: Any): Unit = { - ref.getClass.getFields.find(_.getName == name).get - .set(ref, value.asInstanceOf[AnyRef]) - } -} - -private[sql] case class DeleteLoadsById( - loadids: Seq[String], - databaseNameOp: Option[String], - tableName: String) extends RunnableCommand { - - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - def run(sqlContext: SQLContext): Seq[Row] = { - Checker.validateTableExists(databaseNameOp, tableName, sqlContext) - val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp, - tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable - CarbonStore.deleteLoadById( - loadids, - getDB.getDatabaseName(databaseNameOp, sqlContext), - tableName, - carbonTable - ) - Seq.empty - - } - - // validates load ids - private def validateLoadIds: Unit = { - if (loadids.isEmpty) { - val errorMessage = "Error: Segment id(s) should not be empty." - throw new MalformedCarbonCommandException(errorMessage) - - } - } -} - -private[sql] case class DeleteLoadsByLoadDate( - databaseNameOp: Option[String], - tableName: String, - dateField: String, - loadDate: String) extends RunnableCommand { - - val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.tablemodel.tableSchema") - - def run(sqlContext: SQLContext): Seq[Row] = { - Checker.validateTableExists(databaseNameOp, tableName, sqlContext) - val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp, - tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable - CarbonStore.deleteLoadByDate( - loadDate, - getDB.getDatabaseName(databaseNameOp, sqlContext), - tableName, - carbonTable - ) - Seq.empty - - } - -} - -object LoadTable { - - def updateTableMetadata(carbonLoadModel: CarbonLoadModel, - sqlContext: SQLContext, - model: DictionaryLoadModel, - noDictDimension: Array[CarbonDimension]): Unit = { - - val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation, - model.table) - val schemaFilePath = carbonTablePath.getSchemaFilePath - - // read TableInfo - val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath) - - // modify TableInfo - val columns = tableInfo.getFact_table.getTable_columns - for (i <- 0 until columns.size) { - if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) { - columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY) - } - } - - // write TableInfo - CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo) - - // update Metadata - val catalog = CarbonEnv.get.carbonMetastore - catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo, - model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath) - - // update CarbonDataLoadSchema - val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName), - model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable - carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) - } - -} - -private[sql] case class LoadTableByInsert(relation: CarbonDatasourceRelation, - child: LogicalPlan, isOverwriteExist: Boolean) extends RunnableCommand { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - def run(sqlContext: SQLContext): Seq[Row] = { - val df = new DataFrame(sqlContext, child) - val header = relation.carbonRelation.output.map(_.name).mkString(",") - val load = LoadTable( - Some(relation.carbonRelation.databaseName), - relation.carbonRelation.tableName, - null, - Seq(), - scala.collection.immutable.Map("fileheader" -> header), - isOverwriteExist, - null, - Some(df)).run(sqlContext) - // updating relation metadata. This is in case of auto detect high cardinality - relation.carbonRelation.metaData = - CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable) - load - } -} -case class LoadTable( - databaseNameOp: Option[String], - tableName: String, - factPathFromUser: String, - dimFilesPath: Seq[DataLoadTableFileMapping], - options: scala.collection.immutable.Map[String, String], - isOverwriteExist: Boolean, - var inputSqlString: String = null, - dataFrame: Option[DataFrame] = None, - updateModel: Option[UpdateTableModel] = None) extends RunnableCommand { - - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - private def checkDefaultValue(value: String, default: String) = if (StringUtils.isEmpty(value)) { - default - } else { - value - } - - def run(sqlContext: SQLContext): Seq[Row] = { - if (dataFrame.isDefined && !updateModel.isDefined) { - val rdd = dataFrame.get.rdd - if (rdd.partitions == null || rdd.partitions.length == 0) { - LOGGER.warn("DataLoading finished. No data was loaded.") - return Seq.empty - } - } - - val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext) - if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) { - logError(s"Data loading failed. table not found: $dbName.$tableName") - LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName") - sys.error(s"Data loading failed. table not found: $dbName.$tableName") - } - - val relation = CarbonEnv.get.carbonMetastore - .lookupRelation1(Option(dbName), tableName)(sqlContext) - .asInstanceOf[CarbonRelation] - if (relation == null) { - sys.error(s"Table $dbName.$tableName does not exist") - } - CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false") - val carbonLock = CarbonLockFactory - .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier - .getCarbonTableIdentifier, - LockUsage.METADATA_LOCK - ) - try { - // take lock only in case of normal data load. - if (!updateModel.isDefined) { - if (carbonLock.lockWithRetries()) { - logInfo("Successfully able to get the table metadata file lock") - } else { - sys.error("Table is locked for updation. Please try after some time") - } - } - - val factPath = if (dataFrame.isDefined) { - "" - } else { - FileUtils.getPaths( - CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser)) - } - val carbonLoadModel = new CarbonLoadModel() - carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName) - carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) - carbonLoadModel.setStorePath(relation.tableMeta.storePath) - - val table = relation.tableMeta.carbonTable - carbonLoadModel.setTableName(table.getFactTableName) - val dataLoadSchema = new CarbonDataLoadSchema(table) - // Need to fill dimension relation - carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) - - val partitionLocation = relation.tableMeta.storePath + "/partition/" + - relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" + - relation.tableMeta.carbonTableIdentifier.getTableName + "/" - - - val columnar = sqlContext.getConf("carbon.is.columnar.storage", "true").toBoolean - - val delimiter = options.getOrElse("delimiter", ",") - val quoteChar = options.getOrElse("quotechar", "\"") - var fileHeader = options.getOrElse("fileheader", "") - val escapeChar = options.getOrElse("escapechar", "\\") - val commentchar = options.getOrElse("commentchar", "#") - val columnDict = options.getOrElse("columndict", null) - val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N") - val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false") - val badRecordActionValue = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) - val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue) - val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", "false") - val allDictionaryPath = options.getOrElse("all_dictionary_path", "") - val complex_delimiter_level_1 = options.getOrElse("complex_delimiter_level_1", "\\$") - val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:") - val dateFormat = options.getOrElse("dateformat", null) - ValidateUtil.validateDateFormat(dateFormat, table, tableName) - val maxColumns = options.getOrElse("maxcolumns", null) - val tableProperties = table.getTableInfo.getFactTable.getTableProperties - val sortScopeDefault = CarbonProperties.getInstance(). - getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, - CarbonProperties.getInstance().getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, - CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) - val sortScope = if (null == tableProperties) { - sortScopeDefault - } else { - tableProperties.getOrDefault("sort_scope", sortScopeDefault) - } - - ValidateUtil.validateSortScope(table, sortScope) - val carbonProperty: CarbonProperties = CarbonProperties.getInstance() - val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, - carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, - CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))) - val globalSortPartitions = options.getOrElse("global_sort_partitions", carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null)) - ValidateUtil.validateGlobalSortPartitions(globalSortPartitions) - - // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option, - // we should use table schema to generate file header. - val headerOption = options.get("header") - if (headerOption.isDefined) { - // whether the csv file has file header - // the default value is true - val header = try { - headerOption.get.toBoolean - } catch { - case ex: IllegalArgumentException => - throw new MalformedCarbonCommandException( - "'header' option should be either 'true' or 'false'. " + ex.getMessage) - } - header match { - case true => - if (fileHeader.nonEmpty) { - throw new MalformedCarbonCommandException( - "When 'header' option is true, 'fileheader' option is not required.") - } - case false => - // generate file header - if (fileHeader.isEmpty) { - fileHeader = table.getCreateOrderColumn(table.getFactTableName) - .asScala.map(_.getColName).mkString(",") - } - } - } - - val bad_record_path = options.getOrElse("bad_record_path", - CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) - if (badRecordsLoggerEnable.toBoolean || - LoggerAction.REDIRECT.name().equalsIgnoreCase(badRecordsAction)) { - if (!CarbonUtil.isValidBadStorePath(bad_record_path)) { - sys.error("Invalid bad records location.") - } - } - carbonLoadModel.setBadRecordsLocation(bad_record_path) - carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\")) - carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\"")) - carbonLoadModel.setCommentChar(checkDefaultValue(commentchar, "#")) - carbonLoadModel.setDateFormat(dateFormat) - carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, - CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) - carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_DATE_FORMAT, - CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) - carbonLoadModel - .setSerializationNullFormat( - TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + serializationNullFormat) - carbonLoadModel - .setBadRecordsLoggerEnable( - TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + badRecordsLoggerEnable) - carbonLoadModel - .setBadRecordsAction( - TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsAction) - carbonLoadModel - .setIsEmptyDataBadRecord( - DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord) - carbonLoadModel.setSortScope(sortScope) - carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB) - carbonLoadModel.setGlobalSortPartitions(globalSortPartitions) - // when single_pass=true, and not use all dict - val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match { - case "true" => - true - case "false" => - // when single_pass = false and if either alldictionary - // or columnDict is configured the do not allow load - if (StringUtils.isNotEmpty(allDictionaryPath) || StringUtils.isNotEmpty(columnDict)) { - throw new MalformedCarbonCommandException( - "Can not use all_dictionary_path or columndict without single_pass.") - } else { - false - } - case illegal => - LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " + - "Please set it as 'true' or 'false'") - false - } - carbonLoadModel.setUseOnePass(useOnePass) - - if (delimiter.equalsIgnoreCase(complex_delimiter_level_1) || - complex_delimiter_level_1.equalsIgnoreCase(complex_delimiter_level_2) || - delimiter.equalsIgnoreCase(complex_delimiter_level_2)) { - sys.error(s"Field Delimiter & Complex types delimiter are same") - } - else { - carbonLoadModel.setComplexDelimiterLevel1( - CarbonUtil.delimiterConverter(complex_delimiter_level_1)) - carbonLoadModel.setComplexDelimiterLevel2( - CarbonUtil.delimiterConverter(complex_delimiter_level_2)) - } - // set local dictionary path, and dictionary file extension - carbonLoadModel.setAllDictPath(allDictionaryPath) - - val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS - - try { - // First system has to partition the data first and then call the load data - LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") - carbonLoadModel.setFactFilePath(factPath) - carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimiter)) - carbonLoadModel.setCsvHeader(fileHeader) - carbonLoadModel.setColDictFilePath(columnDict) - carbonLoadModel.setDirectLoad(true) - carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) - val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns, - maxColumns) - carbonLoadModel.setMaxColumns(validatedMaxColumns.toString) - GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata - val storePath = relation.tableMeta.storePath - val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier - .getCarbonTableIdentifier - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(storePath, carbonTableIdentifier) - val dictFolderPath = carbonTablePath.getMetadataDirectoryPath - val dimensions = carbonTable.getDimensionByTableName( - carbonTable.getFactTableName).asScala.toArray - // add the start entry for the new load in the table status file - if (!updateModel.isDefined) { - CommonUtil. - readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath, isOverwriteExist) - } - if (isOverwriteExist) { - LOGGER.info(s"Overwrite is in progress for carbon table with $dbName.$tableName") - } - if (null == carbonLoadModel.getLoadMetadataDetails) { - CommonUtil.readLoadMetadataDetails(carbonLoadModel) - } - if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass && - StringUtils.isEmpty(columnDict) && StringUtils.isEmpty(allDictionaryPath)) { - LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load") - LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load") - carbonLoadModel.setUseOnePass(false) - } - if (carbonLoadModel.getUseOnePass) { - val colDictFilePath = carbonLoadModel.getColDictFilePath - if (!StringUtils.isEmpty(colDictFilePath)) { - carbonLoadModel.initPredefDictMap() - // generate predefined dictionary - GlobalDictionaryUtil - .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier, - dimensions, carbonLoadModel, sqlContext, storePath, dictFolderPath) - } - val allDictPath: String = carbonLoadModel.getAllDictPath - if(!StringUtils.isEmpty(allDictPath)) { - carbonLoadModel.initPredefDictMap() - GlobalDictionaryUtil - .generateDictionaryFromDictionaryFiles(sqlContext, - carbonLoadModel, - storePath, - carbonTableIdentifier, - dictFolderPath, - dimensions, - allDictionaryPath) - } - // dictionaryServerClient dictionary generator - val dictionaryServerPort = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT, - CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT) - val sparkDriverHost = sqlContext.sparkContext.getConf.get("spark.driver.host") - carbonLoadModel.setDictionaryServerHost(sparkDriverHost) - // start dictionary server when use one pass load and dimension with DICTIONARY - // encoding is present. - val allDimensions = table.getAllDimensions.asScala.toList - val createDictionary = allDimensions.exists { - carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) && - !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) - } - val server: Option[DictionaryServer] = if (createDictionary) { - val dictionaryServer = DictionaryServer - .getInstance(dictionaryServerPort.toInt, carbonTable) - carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort) - sqlContext.sparkContext.addSparkListener(new SparkListener() { - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { - dictionaryServer.shutdown() - } - }) - Some(dictionaryServer) - } else { - None - } - CarbonDataRDDFactory.loadCarbonData(sqlContext, - carbonLoadModel, - relation.tableMeta.storePath, - columnar, - partitionStatus, - server, - isOverwriteExist, - dataFrame, - updateModel) - } else { - val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) { - val fields = dataFrame.get.schema.fields - import org.apache.spark.sql.functions.udf - // extracting only segment from tupleId - val getSegIdUDF = udf((tupleId: String) => - CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID)) - // getting all fields except tupleId field as it is not required in the value - var otherFields = fields.toSeq - .filter(field => !field.name - .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) - .map(field => { - if (field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) { - new Column(field.name - .substring(0, - field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION))) - } else { - - new Column(field.name) - } - }) - - // extract tupleId field which will be used as a key - val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute - .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))). - as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID) - // use dataFrameWithoutTupleId as dictionaryDataFrame - val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*) - otherFields = otherFields :+ segIdColumn - // use dataFrameWithTupleId as loadDataFrame - val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*) - (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId)) - } else { - (dataFrame, dataFrame) - } - GlobalDictionaryUtil - .generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath, - dictionaryDataFrame) - CarbonDataRDDFactory.loadCarbonData(sqlContext, - carbonLoadModel, - relation.tableMeta.storePath, - columnar, - partitionStatus, - None, - isOverwriteExist, - loadDataFrame, - updateModel) - } - } catch { - case CausedBy(ex: NoRetryException) => - LOGGER.error(ex, s"Dataload failure for $dbName.$tableName") - throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}") - case ex: Exception => - LOGGER.error(ex) - LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs") - throw ex - } finally { - // Once the data load is successful delete the unwanted partition files - try { - val fileType = FileFactory.getFileType(partitionLocation) - if (FileFactory.isFileExist(partitionLocation, fileType)) { - val file = FileFactory - .getCarbonFile(partitionLocation, fileType) - CarbonUtil.deleteFoldersAndFiles(file) - } - } catch { - case ex: Exception => - LOGGER.error(ex) - LOGGER.audit(s"Dataload failure for $dbName.$tableName. " + - "Problem deleting the partition folder") - throw ex - } - - } - } catch { - case dle: DataLoadingException => - LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + dle.getMessage) - throw dle - case mce: MalformedCarbonCommandException => - LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + mce.getMessage) - throw mce - } finally { - if (carbonLock != null) { - if (carbonLock.unlock()) { - logInfo("Table MetaData Unlocked Successfully after data load") - } else { - logError("Unable to unlock Table MetaData") - } - } - } - Seq.empty - } - - private def updateTableMetadata(carbonLoadModel: CarbonLoadModel, - sqlContext: SQLContext, - model: DictionaryLoadModel, - noDictDimension: Array[CarbonDimension]): Unit = { - - val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation, - model.table) - val schemaFilePath = carbonTablePath.getSchemaFilePath - - // read TableInfo - val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath) - - // modify TableInfo - val columns = tableInfo.getFact_table.getTable_columns - for (i <- 0 until columns.size) { - if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) { - columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY) - } - } - - // write TableInfo - CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo) - - - val catalog = CarbonEnv.get.carbonMetastore - - // upate the schema modified time - catalog.updateSchemasUpdatedTime(catalog.touchSchemaFileSystemTime( - carbonLoadModel.getDatabaseName, - carbonLoadModel.getTableName)) - - // update Metadata - catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo, - model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath) - - // update CarbonDataLoadSchema - val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName), - model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable - carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) - } - -} - -private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: Option[String], - tableName: String) - extends RunnableCommand { - - def run(sqlContext: SQLContext): Seq[Row] = { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext) - val identifier = TableIdentifier(tableName, Option(dbName)) - val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "") - val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) - val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() - val catalog = CarbonEnv.get.carbonMetastore - val storePath = catalog.storePath - try { - locksToBeAcquired foreach { - lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock) - } - LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]") - CarbonEnv.get.carbonMetastore.dropTable(storePath, identifier)(sqlContext) - LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]") - } catch { - case ex: Exception => - LOGGER.error(ex, s"Dropping table $dbName.$tableName failed") - sys.error(s"Dropping table $dbName.$tableName failed: ${ex.getMessage}") - } finally { - if (carbonLocks.nonEmpty) { - val unlocked = carbonLocks.forall(_.unlock()) - if (unlocked) { - logInfo("Table MetaData Unlocked Successfully") - // deleting any remaining files. - val metadataFilePath = CarbonStorePath - .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath - val fileType = FileFactory.getFileType(metadataFilePath) - if (FileFactory.isFileExist(metadataFilePath, fileType)) { - val file = FileFactory.getCarbonFile(metadataFilePath, fileType) - CarbonUtil.deleteFoldersAndFiles(file.getParentFile) - } - } - } - } - Seq.empty - } -} - -private[sql] case class ShowLoads( - databaseNameOp: Option[String], - tableName: String, - limit: Option[String], - override val output: Seq[Attribute]) extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - Checker.validateTableExists(databaseNameOp, tableName, sqlContext) - val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp, - tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable - CarbonStore.showSegments( - getDB.getDatabaseName(databaseNameOp, sqlContext), - tableName, - limit, - carbonTable.getMetaDataFilepath - ) - } -} - -private[sql] case class DescribeCommandFormatted( - child: SparkPlan, - override val output: Seq[Attribute], - tblIdentifier: TableIdentifier) - extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - val relation = CarbonEnv.get.carbonMetastore - .lookupRelation1(tblIdentifier)(sqlContext).asInstanceOf[CarbonRelation] - val mapper = new ObjectMapper() - val colProps = StringBuilder.newBuilder - var results: Seq[(String, String, String)] = child.schema.fields.map { field => - val comment = if (relation.metaData.dims.contains(field.name)) { - val dimension = relation.metaData.carbonTable.getDimensionByName( - relation.tableMeta.carbonTableIdentifier.getTableName, - field.name) - if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) { - colProps.append(field.name).append(".") - .append(mapper.writeValueAsString(dimension.getColumnProperties)) - .append(",") - } - if (dimension.hasEncoding(Encoding.DICTIONARY)) { - "DICTIONARY, KEY COLUMN" + (dimension.hasEncoding(Encoding.INVERTED_INDEX) match { - case false => ",NOINVERTEDINDEX" - case _ => "" - }) - } else { - "KEY COLUMN" + (dimension.hasEncoding(Encoding.INVERTED_INDEX) match { - case false => ",NOINVERTEDINDEX" - case _ => "" - }) - } - } else { - "MEASURE" - } - (field.name, field.dataType.simpleString, comment) - } - val colPropStr = if (colProps.toString().trim().length() > 0) { - // drops additional comma at endpom - colProps.toString().dropRight(1) - } else { - colProps.toString() - } - results ++= Seq(("", "", ""), ("##Detailed Table Information", "", "")) - results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier - .getDatabaseName, "") - ) - results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, "")) - results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, "")) - val carbonTable = relation.tableMeta.carbonTable - results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", "")) - results ++= Seq(("SORT_SCOPE", carbonTable.getTableInfo.getFactTable - .getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants - .LOAD_SORT_SCOPE_DEFAULT), CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) - results ++= Seq(("", "", ""), ("##Detailed Column property", "", "")) - if (colPropStr.length() > 0) { - results ++= Seq((colPropStr, "", "")) - } else { - results ++= Seq(("ADAPTIVE", "", "")) - } - results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns( - relation.tableMeta.carbonTableIdentifier.getTableName).asScala - .map(column => column).mkString(","), "")) - val dimension = carbonTable - .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName) - results ++= getColumnGroups(dimension.asScala.toList) - if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) { - results ++= - Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getFactTableName) - .getColumnSchemaList.asScala.map(_.getColumnName).mkString(","), "")) - } - results.map { case (name, dataType, comment) => - Row(f"$name%-36s $dataType%-80s $comment%-72s") - } - } - - private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = { - var results: Seq[(String, String, String)] = - Seq(("", "", ""), ("##Column Group Information", "", "")) - val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter { - case (groupId, _) => groupId != -1 - }.toSeq.sortBy(_._1) - val groups = groupedDimensions.map(colGroups => { - colGroups._2.map(dim => dim.getColName).mkString(", ") - }) - var index = 1 - groups.foreach { x => - results = results :+ (s"Column Group $index", x, "") - index = index + 1 - } - results - } -} - -private[sql] case class DeleteLoadByDate( - databaseNameOp: Option[String], - tableName: String, - dateField: String, - dateValue: String -) extends RunnableCommand { - - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - def run(sqlContext: SQLContext): Seq[Row] = { - val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext) - LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName") - val identifier = TableIdentifier(tableName, Option(dbName)) - val relation = CarbonEnv.get.carbonMetastore - .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation] - var level: String = "" - val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName) - if (relation == null) { - LOGGER.audit(s"The delete load by date is failed. Table $dbName.$tableName does not exist") - sys.error(s"Table $dbName.$tableName does not exist") - } - val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter( - filter => filter.name.equalsIgnoreCase(dateField) && - filter.dataType.isInstanceOf[TimestampType]).toList - if (matches.isEmpty) { - LOGGER.audit("The delete load by date is failed. " + - s"Table $dbName.$tableName does not contain date field: $dateField") - sys.error(s"Table $dbName.$tableName does not contain date field $dateField") - } else { - level = matches.asJava.get(0).name - } - val actualColName = relation.metaData.carbonTable.getDimensionByName(tableName, level) - .getColName - DataManagementFunc.deleteLoadByDate( - sqlContext, - new CarbonDataLoadSchema(carbonTable), - dbName, - tableName, - CarbonEnv.get.carbonMetastore.storePath, - level, - actualColName, - dateValue) - LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$tableName.") - Seq.empty - } - -} - -private[sql] case class CleanFiles( - databaseNameOp: Option[String], - tableName: String) extends RunnableCommand { - - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - def run(sqlContext: SQLContext): Seq[Row] = { - Checker.validateTableExists(databaseNameOp, tableName, sqlContext) - val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp, - tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable - CarbonStore.cleanFiles( - getDB.getDatabaseName(databaseNameOp, sqlContext), - tableName, - sqlContext.asInstanceOf[CarbonContext].storePath, - carbonTable, - false) - Seq.empty - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala deleted file mode 100644 index d23b18f..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.hive - -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit -import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._ -import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.Alias -import org.apache.spark.sql.catalyst.plans.Inner -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.execution.command.ProjectForDeleteCommand -import org.apache.spark.sql.execution.datasources.LogicalRelation - -import org.apache.carbondata.core.constants.CarbonCommonConstants - -/** - * Insert into carbon table from other source - */ -object CarbonPreInsertionCasts extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan.transform { - // Wait until children are resolved. - case p: LogicalPlan if !p.childrenResolved => p - - case p @ InsertIntoTable(relation: LogicalRelation, _, child, _, _) - if relation.relation.isInstanceOf[CarbonDatasourceRelation] => - castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceRelation], child) - } - - def castChildOutput(p: InsertIntoTable, relation: CarbonDatasourceRelation, child: LogicalPlan) - : LogicalPlan = { - if (relation.carbonRelation.output.size > CarbonCommonConstants - .DEFAULT_MAX_NUMBER_OF_COLUMNS) { - sys - .error("Maximum supported column by carbon is:" + CarbonCommonConstants - .DEFAULT_MAX_NUMBER_OF_COLUMNS - ) - } - if (child.output.size >= relation.carbonRelation.output.size ) { - InsertIntoCarbonTable(relation, p.partition, p.child, p.overwrite, p.ifNotExists) - } else { - sys.error("Cannot insert into target table because column number are different") - } - } -} - - -object CarbonIUDAnalysisRule extends Rule[LogicalPlan] { - - var sqlContext: SQLContext = _ - - def init(sqlContext: SQLContext) { - this.sqlContext = sqlContext - } - - private def processUpdateQuery( - table: UnresolvedRelation, - columns: List[String], - selectStmt: String, - filter: String): LogicalPlan = { - var includedDestColumns = false - var includedDestRelation = false - var addedTupleId = false - - def prepareTargetReleation(relation: UnresolvedRelation): Subquery = { - val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId", - Seq.empty, isDistinct = false), "tupleId")()) - val projList = Seq( - UnresolvedAlias(UnresolvedStar(table.alias)), tupleId) - // include tuple id and rest of the required columns in subqury - Subquery(table.alias.getOrElse(""), Project(projList, relation)) - } - // get the un-analyzed logical plan - val targetTable = prepareTargetReleation(table) - val selectPlan = org.apache.spark.sql.SQLParser.parse(selectStmt, sqlContext) transform { - case Project(projectList, child) if (!includedDestColumns) => - includedDestColumns = true - if (projectList.size != columns.size) { - sys.error("Number of source and destination columns are not matching") - } - val renamedProjectList = projectList.zip(columns).map{ case(attr, col) => - attr match { - case UnresolvedAlias(child) => - UnresolvedAlias(Alias(child, col + "-updatedColumn")()) - case _ => attr - } - } - val list = Seq( - UnresolvedAlias(UnresolvedStar(table.alias))) ++ renamedProjectList - Project(list, child) - case Filter(cond, child) if (!includedDestRelation) => - includedDestRelation = true - Filter(cond, Join(child, targetTable, Inner, None)) - case r @ UnresolvedRelation(t, a) if (!includedDestRelation && - t != table.tableIdentifier) => - includedDestRelation = true - Join(r, targetTable, Inner, None) - } - val updatedSelectPlan = if (!includedDestRelation) { - // special case to handle self join queries - // Eg. update tableName SET (column1) = (column1+1) - selectPlan transform { - case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier && - addedTupleId == false) => - addedTupleId = true - targetTable - } - } else { - selectPlan - } - val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") - var transformed: Boolean = false - // Create a dummy projection to include filter conditions - SQLParser.parse("select * from " + - table.tableIdentifier.mkString(".") + " " + alias + " " + filter, sqlContext) transform { - case UnresolvedRelation(t, Some(a)) if ( - !transformed && t == table.tableIdentifier && a == alias) => - transformed = true - // Add the filter condition of update statement on destination table - Subquery(alias, updatedSelectPlan) - } - } else { - updatedSelectPlan - } - val tid = CarbonTableIdentifierImplicit.toTableIdentifier(table.tableIdentifier) - val tidSeq = Seq(getDB.getDatabaseName(tid.database, sqlContext), tid.table) - val destinationTable = UnresolvedRelation(tidSeq, table.alias) - ProjectForUpdate(destinationTable, columns, Seq(finalPlan)) - } - - def processDeleteRecordsQuery(selectStmt: String, table: UnresolvedRelation): LogicalPlan = { - val tid = CarbonTableIdentifierImplicit.toTableIdentifier(table.tableIdentifier) - val tidSeq = Seq(getDB.getDatabaseName(tid.database, sqlContext), tid.table) - var addedTupleId = false - val selectPlan = SQLParser.parse(selectStmt, sqlContext) transform { - case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier && - addedTupleId == false) => - addedTupleId = true - val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId", - Seq.empty, isDistinct = false), "tupleId")()) - val projList = Seq( - UnresolvedAlias(UnresolvedStar(table.alias)), tupleId) - // include tuple id in subqury - Project(projList, relation) - } - ProjectForDeleteCommand( - selectPlan, - tidSeq, - System.currentTimeMillis().toString) - } - - override def apply(logicalplan: LogicalPlan): LogicalPlan = { - - logicalplan transform { - case UpdateTable(t, cols, sel, where) => processUpdateQuery(t, cols, sel, where) - case DeleteRecords(statement, table) => processDeleteRecordsQuery(statement, table) - } - } -}
