http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala index 3949404..184ab9e 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala @@ -16,8 +16,28 @@ */ package org.apache.spark.util +import java.io.{File, IOException} +import java.text.SimpleDateFormat +import java.util + +import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce.Job + +import org.apache.carbondata.core.datastore.block.{SegmentProperties, TableBlockInfo} +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.schema.PartitionInfo +import org.apache.carbondata.core.metadata.schema.partition.PartitionType +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.CarbonInputSplit +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.spark.util.CommonUtil + object PartitionUtils { def getListInfo(originListInfo: String): List[List[String]] = { @@ -49,4 +69,119 @@ object PartitionUtils { } listInfo.toList } + + /** + * verify the add/split information and update the partitionInfo: + * 1. update rangeInfo/listInfo + * 2. update partitionIds + */ + def updatePartitionInfo(partitionInfo: PartitionInfo, partitionIdList: List[Int], + partitionId: Int, splitInfo: List[String], timestampFormatter: SimpleDateFormat, + dateFormatter: SimpleDateFormat): Unit = { + val columnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType + val index = partitionIdList.indexOf(partitionId) + if (partitionInfo.getPartitionType == PartitionType.RANGE) { + val rangeInfo = partitionInfo.getRangeInfo.asScala.toList + val newRangeInfo = partitionId match { + case 0 => rangeInfo ++ splitInfo + case _ => rangeInfo.take(index - 1) ++ splitInfo ++ + rangeInfo.takeRight(rangeInfo.size - index) + } + CommonUtil.validateRangeInfo(newRangeInfo, columnDataType, + timestampFormatter, dateFormatter) + partitionInfo.setRangeInfo(newRangeInfo.asJava) + } else if (partitionInfo.getPartitionType == PartitionType.LIST) { + val originList = partitionInfo.getListInfo.asScala.map(_.asScala.toList).toList + if (partitionId != 0) { + val targetListInfo = partitionInfo.getListInfo.get(index - 1) + CommonUtil.validateSplitListInfo(targetListInfo.asScala.toList, splitInfo, originList) + } else { + CommonUtil.validateAddListInfo(splitInfo, originList) + } + val addListInfo = PartitionUtils.getListInfo(splitInfo.mkString(",")) + val newListInfo = partitionId match { + case 0 => originList ++ addListInfo + case _ => originList.take(index - 1) ++ addListInfo ++ + originList.takeRight(originList.size - index) + } + partitionInfo.setListInfo(newListInfo.map(_.asJava).asJava) + } + + if (partitionId == 0) { + partitionInfo.addPartition(splitInfo.size) + } else { + partitionInfo.splitPartition(index, splitInfo.size) + } + } + + /** + * Used for alter table partition commands to get segmentProperties in spark node + * @param identifier + * @param segmentId + * @param oldPartitionIdList Task id group before partition info is changed + * @return + */ + def getSegmentProperties(identifier: AbsoluteTableIdentifier, segmentId: String, + partitionIds: List[String], oldPartitionIdList: List[Int], + partitionInfo: PartitionInfo): SegmentProperties = { + val tableBlockInfoList = + getPartitionBlockList(identifier, segmentId, partitionIds, oldPartitionIdList, partitionInfo) + val footer = CarbonUtil.readMetadatFile(tableBlockInfoList.get(0)) + val segmentProperties = new SegmentProperties(footer.getColumnInTable, + footer.getSegmentInfo.getColumnCardinality) + segmentProperties + } + + def getPartitionBlockList(identifier: AbsoluteTableIdentifier, segmentId: String, + partitionIds: List[String], oldPartitionIdList: List[Int], + partitionInfo: PartitionInfo): java.util.List[TableBlockInfo] = { + val jobConf = new JobConf(new Configuration) + val job = new Job(jobConf) + val format = CarbonInputFormatUtil + .createCarbonTableInputFormat(identifier, partitionIds.asJava, job) + val splits = format.getSplitsOfOneSegment(job, segmentId, + oldPartitionIdList.map(_.asInstanceOf[Integer]).asJava, partitionInfo) + val blockList = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]) + val tableBlockInfoList = CarbonInputSplit.createBlocks(blockList.asJava) + tableBlockInfoList + } + + @throws(classOf[IOException]) + def deleteOriginalCarbonFile(identifier: AbsoluteTableIdentifier, segmentId: String, + partitionIds: List[String], oldPartitionIdList: List[Int], storePath: String, + dbName: String, tableName: String, partitionInfo: PartitionInfo, + carbonLoadModel: CarbonLoadModel): Unit = { + val newTime = carbonLoadModel.getFactTimeStamp + val tableBlockInfoList = + getPartitionBlockList(identifier, segmentId, partitionIds, oldPartitionIdList, + partitionInfo).asScala + val pathList: util.List[String] = new util.ArrayList[String]() + val carbonTablePath = new CarbonTablePath(storePath, dbName, tableName) + tableBlockInfoList.foreach{ tableBlockInfo => + val path = tableBlockInfo.getFilePath + val timestamp = CarbonTablePath.DataFileUtil.getTimeStampFromFileName(path) + if (timestamp.toLong != newTime) { + // add carbondata file + pathList.add(path) + // add index file + val version = tableBlockInfo.getVersion + val taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path) + val batchNo = CarbonTablePath.DataFileUtil.getBatchNoFromTaskNo(taskNo) + val taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskNo) + val bucketNumber = CarbonTablePath.DataFileUtil.getBucketNo(path) + val indexFilePath = carbonTablePath.getCarbonIndexFilePath(String.valueOf(taskId), "0", + segmentId, batchNo, String.valueOf(bucketNumber), timestamp, version) + // indexFilePath could be duplicated when multiple data file related to one index file + if (indexFilePath != null && !pathList.contains(indexFilePath)) { + pathList.add(indexFilePath) + } + } + } + val files: util.List[File] = new util.ArrayList[File]() + for (path <- pathList.asScala) { + val file = new File(path) + files.add(file) + } + CarbonUtil.deleteFiles(files.asScala.toArray) + } }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 5ede835..f556a05 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -35,7 +35,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} import org.apache.spark.{SparkEnv, SparkException} import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD, UpdateCoalescedRDD} import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext} -import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, ExecutionErrors, UpdateTableModel} +import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.util.SparkUtil @@ -47,23 +47,23 @@ import org.apache.carbondata.core.dictionary.server.DictionaryServer import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion} import org.apache.carbondata.core.metadata.datatype.DataType +import org.apache.carbondata.core.metadata.schema.PartitionInfo import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.scan.partition.PartitionUtil -import org.apache.carbondata.core.statusmanager.LoadMetadataDetails +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties} import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, StringArrayWritable} import org.apache.carbondata.processing.etl.DataLoadingException import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.processing.model.CarbonLoadModel -import org.apache.carbondata.processing.newflow.exception.{BadRecordFoundException, CarbonDataLoadingException} import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder -import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException +import org.apache.carbondata.processing.newflow.exception.{BadRecordFoundException, CarbonDataLoadingException} import org.apache.carbondata.processing.newflow.sort.SortScopeOptions import org.apache.carbondata.processing.util.CarbonDataProcessorUtil -import org.apache.carbondata.spark._ +import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _} import org.apache.carbondata.spark.load.{FailureCauses, _} import org.apache.carbondata.spark.splits.TableSplit import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, CommonUtil} @@ -178,6 +178,26 @@ object CarbonDataRDDFactory { } } + def alterTableSplitPartition(sqlContext: SQLContext, + partitionId: String, + carbonLoadModel: CarbonLoadModel, + storePath: String, + oldPartitionIdList: List[Int]): Unit = { + LOGGER.audit(s"Add partition request received for table " + + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") + try { + startSplitThreads(sqlContext, + carbonLoadModel, + storePath, + partitionId, + oldPartitionIdList) + } catch { + case e: Exception => + LOGGER.error(s"Exception in start splitting partition thread. ${ e.getMessage }") + throw e + } + } + def handleCompactionForSystemLocking(sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, storePath: String, @@ -345,6 +365,73 @@ object CarbonDataRDDFactory { compactionThread.run() } + case class SplitThread(sqlContext: SQLContext, + carbonLoadModel: CarbonLoadModel, + executor: ExecutorService, + storePath: String, + segmentId: String, + partitionId: String, + oldPartitionIdList: List[Int]) extends Thread { + override def run(): Unit = { + var triggeredSplitPartitionStatus = false + var exception: Exception = null + try { + DataManagementFunc.executePartitionSplit(sqlContext, + carbonLoadModel, executor, storePath, segmentId, partitionId, + oldPartitionIdList) + triggeredSplitPartitionStatus = true + } catch { + case e: Exception => + LOGGER.error(s"Exception in partition split thread: ${ e.getMessage } }") + exception = e + } + if (triggeredSplitPartitionStatus == false) { + throw new Exception("Exception in split partition " + exception.getMessage) + } + } + } + + def startSplitThreads(sqlContext: SQLContext, + carbonLoadModel: CarbonLoadModel, + storePath: String, + partitionId: String, + oldPartitionIdList: List[Int]): Unit = { + val numberOfCores = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.NUM_CORES_ALT_PARTITION, + CarbonCommonConstants.DEFAULT_NUMBER_CORES) + val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt) + try { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier + val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier) + val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala + val threadArray: Array[SplitThread] = new Array[SplitThread](validSegments.size) + var i = 0 + validSegments.foreach { segmentId => + threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor, storePath, + segmentId, partitionId, oldPartitionIdList) + threadArray(i).start() + i += 1 + } + threadArray.foreach { + thread => thread.join() + } + } catch { + case e: Exception => + LOGGER.error(s"Exception when split partition: ${ e.getMessage }") + throw e + } finally { + executor.shutdown() + try { + CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false) + } catch { + case e: Exception => + LOGGER.error(s"Exception in add/split partition thread while deleting partial load file" + + s" ${ e.getMessage }") + } + } + } + def loadCarbonData(sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, storePath: String, @@ -946,7 +1033,7 @@ object CarbonDataRDDFactory { } /** - * repartition the input data for partiton table. + * repartition the input data for partition table. * @param sqlContext * @param dataFrame * @param carbonLoadModel http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 4711618..9cc12cd 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.execution.command +import java.text.SimpleDateFormat +import java.util + import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import scala.language.implicitConversions @@ -29,20 +32,24 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.hive.CarbonRelation -import org.apache.spark.util.FileUtils +import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation} +import org.apache.spark.util.{AlterTableUtil, FileUtils, PartitionUtils} import org.codehaus.jackson.map.ObjectMapper import org.apache.carbondata.api.CarbonStore import org.apache.carbondata.common.constants.LoggerAction import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.CacheProvider import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.DictionaryServer -import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.encoder.Encoding -import org.apache.carbondata.core.metadata.schema.table.TableInfo +import org.apache.carbondata.core.metadata.schema.PartitionInfo +import org.apache.carbondata.core.metadata.schema.partition.PartitionType +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} @@ -183,6 +190,136 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab } } +/** + * Command for Alter Table Add & Split partition + * Add is a special case of Splitting the default partition (part0) + * @param splitPartitionModel + */ +case class AlterTableSplitPartitionCommand(splitPartitionModel: AlterTableSplitPartitionModel) + extends RunnableCommand with DataProcessCommand with SchemaProcessCommand { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + val tableName = splitPartitionModel.tableName + val splitInfo = splitPartitionModel.splitInfo + val partitionId = splitPartitionModel.partitionId.toInt + var partitionInfo: PartitionInfo = null + var carbonMetaStore: CarbonMetaStore = null + var relation: CarbonRelation = null + var dbName: String = null + var storePath: String = null + var table: CarbonTable = null + var carbonTableIdentifier: CarbonTableIdentifier = null + val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]() + val timestampFormatter = new SimpleDateFormat(CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) + val dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, + LockUsage.COMPACTION_LOCK, + LockUsage.DELETE_SEGMENT_LOCK, + LockUsage.DROP_TABLE_LOCK, + LockUsage.CLEAN_FILES_LOCK, + LockUsage.ALTER_PARTITION_LOCK) + + // TODO will add rollback function incase process data failure + def run(sparkSession: SparkSession): Seq[Row] = { + processSchema(sparkSession) + processData(sparkSession) + } + + override def processSchema(sparkSession: SparkSession): Seq[Row] = { + dbName = splitPartitionModel.databaseName + .getOrElse(sparkSession.catalog.currentDatabase) + carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore + relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession) + .asInstanceOf[CarbonRelation] + carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier + storePath = relation.tableMeta.storePath + if (relation == null) { + sys.error(s"Table $dbName.$tableName does not exist") + } + carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath) + if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) { + LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName") + sys.error(s"Alter table failed. table not found: $dbName.$tableName") + } + table = relation.tableMeta.carbonTable + partitionInfo = table.getPartitionInfo(tableName) + val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList + // keep a copy of partitionIdList before update partitionInfo. + // will be used in partition data scan + oldPartitionIds.addAll(partitionIds.asJava) + + if (partitionInfo == null) { + sys.error(s"Table $tableName is not a partition table.") + } + if (partitionInfo.getPartitionType == PartitionType.HASH) { + sys.error(s"Hash partition table cannot be added or split!") + } + PartitionUtils.updatePartitionInfo(partitionInfo, partitionIds, partitionId, + splitInfo, timestampFormatter, dateFormatter) + + val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) + val schemaFilePath = carbonTablePath.getSchemaFilePath + // read TableInfo + val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession) + val schemaConverter = new ThriftWrapperSchemaConverterImpl() + val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, + dbName, tableName, storePath) + val tableSchema = wrapperTableInfo.getFactTable + tableSchema.setPartitionInfo(partitionInfo) + wrapperTableInfo.setFactTable(tableSchema) + wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis()) + val thriftTable = + schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) + carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable, + dbName, tableName, storePath) + CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable) + // update the schema modified time + carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath) + sparkSession.catalog.refreshTable(tableName) + Seq.empty + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + var locks = List.empty[ICarbonLock] + var success = false + try { + locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName, + locksToBeAcquired)(sparkSession) + val carbonLoadModel = new CarbonLoadModel() + val dataLoadSchema = new CarbonDataLoadSchema(table) + carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) + carbonLoadModel.setTableName(carbonTableIdentifier.getTableName) + carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName) + carbonLoadModel.setStorePath(storePath) + val loadStartTime = CarbonUpdateUtil.readCurrentTime + carbonLoadModel.setFactTimeStamp(loadStartTime) + CarbonDataRDDFactory.alterTableSplitPartition(sparkSession.sqlContext, + partitionId.toString, + carbonLoadModel, + relation.tableMeta.storePath, + oldPartitionIds.asScala.toList + ) + success = true + } catch { + case e: Exception => + success = false + sys.error(s"Add/Split Partition failed. Please check logs for more info. ${ e.getMessage }") + } finally { + AlterTableUtil.releaseLocks(locks) + CacheProvider.getInstance().dropAllCache() + LOGGER.info("Locks released after alter table add/split partition action.") + if (success) { + LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName") + LOGGER.audit(s"Alter table add/split partition is successful for table $dbName.$tableName") + } + } + Seq.empty + } +} + case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand with SchemaProcessCommand { http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 511a61c..780839a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -63,7 +63,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand protected lazy val startCommand: Parser[LogicalPlan] = - loadManagement| showLoads | alterTable | restructure | updateTable | deleteRecords + loadManagement|showLoads|alterTable|restructure|updateTable|deleteRecords|alterPartition protected lazy val loadManagement: Parser[LogicalPlan] = deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew @@ -71,6 +71,30 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { protected lazy val restructure: Parser[LogicalPlan] = alterTableModifyDataType | alterTableDropColumn | alterTableAddColumns + protected lazy val alterPartition: Parser[LogicalPlan] = + alterAddPartition | alterSplitPartition + + protected lazy val alterAddPartition: Parser[LogicalPlan] = + ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> PARTITION ~> + "(" ~> repsep(stringLit, ",") <~ ")") <~ opt(";") ^^ { + case dbName ~ table ~ addInfo => + val alterTableAddPartitionModel = + AlterTableSplitPartitionModel(dbName, table, "0", addInfo) + AlterTableSplitPartitionCommand(alterTableAddPartitionModel) + } + + protected lazy val alterSplitPartition: Parser[LogicalPlan] = + ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (SPLIT ~> PARTITION ~> + "(" ~> numericLit <~ ")") ~ (INTO ~> "(" ~> repsep(stringLit, ",") <~ ")") <~ opt(";") ^^ { + case dbName ~ table ~ partitionId ~ splitInfo => + val alterTableSplitPartitionModel = + AlterTableSplitPartitionModel(dbName, table, partitionId, splitInfo) + if (partitionId == 0) { + sys.error("Please use [Alter Table Add Partition] statement to split default partition!") + } + AlterTableSplitPartitionCommand(alterTableSplitPartitionModel) + } + protected lazy val alterTable: Parser[LogicalPlan] = ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) <~ opt(";") ^^ { case dbName ~ table ~ (compact ~ compactType) => http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index 41d6bd3..b4389a6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -123,7 +123,7 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) { // validate partition clause if (partitionerFields.nonEmpty) { if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) { - throw new MalformedCarbonCommandException("Invalid partition definition") + throw new MalformedCarbonCommandException("Error: Invalid partition definition") } // partition columns should not be part of the schema val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet) http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala new file mode 100644 index 0000000..7d86468 --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.partition + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath + +class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { + + + override def beforeAll { + dropTable + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + /** + * list_table_area_origin + * list_table_area + */ + sql(""" + | CREATE TABLE IF NOT EXISTS list_table_area_origin + | ( + | id Int, + | vin string, + | logdate Timestamp, + | phonenumber Long, + | country string, + | salary Int + | ) + | PARTITIONED BY (area string) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='LIST', + | 'LIST_INFO'='Asia, America, Europe') + """.stripMargin) + sql(""" + | CREATE TABLE IF NOT EXISTS list_table_area + | ( + | id Int, + | vin string, + | logdate Timestamp, + | phonenumber Long, + | country string, + | salary Int + | ) + | PARTITIONED BY (area string) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='LIST', + | 'LIST_INFO'='Asia, America, Europe') + """.stripMargin) + + /** + * range_table_logdate_origin + * range_table_logdate + */ + sql( + """ + | CREATE TABLE IF NOT EXISTS range_table_logdate_origin + | ( + | id Int, + | vin string, + | phonenumber Long, + | country string, + | area string, + | salary Int + | ) + | PARTITIONED BY (logdate Timestamp) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='RANGE', + | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01') + """.stripMargin) + sql( + """ + | CREATE TABLE IF NOT EXISTS range_table_logdate + | ( + | id Int, + | vin string, + | phonenumber Long, + | country string, + | area string, + | salary Int + | ) + | PARTITIONED BY (logdate Timestamp) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='RANGE', + | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01') + """.stripMargin) + + /** + * list_table_country_origin + * list_table_country + */ + sql( + """ + | CREATE TABLE IF NOT EXISTS list_table_country_origin + | ( + | id Int, + | vin string, + | logdate Timestamp, + | phonenumber Long, + | area string, + | salary Int + | ) + | PARTITIONED BY (country string) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='LIST', + | 'LIST_INFO'='(China, US),UK ,Japan,(Canada,Russia, Good, NotGood), Korea ') + """.stripMargin) + sql( + """ + | CREATE TABLE IF NOT EXISTS list_table_country + | ( + | id Int, + | vin string, + | logdate Timestamp, + | phonenumber Long, + | area string, + | salary Int + | ) + | PARTITIONED BY (country string) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='LIST', + | 'LIST_INFO'='(China, US),UK ,Japan,(Canada,Russia, Good, NotGood), Korea ') + """.stripMargin) + + /** + * range_table_logdate_split_origin + * range_table_logdate_split + */ + sql( + """ + | CREATE TABLE IF NOT EXISTS range_table_logdate_split_origin + | ( + | id Int, + | vin string, + | phonenumber Long, + | country string, + | area string, + | salary Int + | ) + | PARTITIONED BY (logdate Timestamp) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='RANGE', + | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01, 2018/01/01') + """.stripMargin) + sql( + """ + | CREATE TABLE IF NOT EXISTS range_table_logdate_split + | ( + | id Int, + | vin string, + | phonenumber Long, + | country string, + | area string, + | salary Int + | ) + | PARTITIONED BY (logdate Timestamp) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='RANGE', + | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01, 2018/01/01') + """.stripMargin) + + /** + * range_table_bucket_origin + * range_table_bucket + */ + sql( + """ + | CREATE TABLE IF NOT EXISTS range_table_bucket_origin + | ( + | id Int, + | vin string, + | phonenumber Long, + | country string, + | area string, + | salary Int + | ) + | PARTITIONED BY (logdate Timestamp) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='RANGE', + | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01, 2018/01/01', + | 'BUCKETNUMBER'='3', + | 'BUCKETCOLUMNS'='country') + """.stripMargin) + sql( + """ + | CREATE TABLE IF NOT EXISTS range_table_bucket + | ( + | id Int, + | vin string, + | phonenumber Long, + | country string, + | area string, + | salary Int + | ) + | PARTITIONED BY (logdate Timestamp) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='RANGE', + | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01, 2018/01/01', + | 'BUCKETNUMBER'='3', + | 'BUCKETCOLUMNS'='country') + """.stripMargin) + + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE list_table_area_origin OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE range_table_logdate_origin OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE list_table_country_origin OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE range_table_logdate_split_origin OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE range_table_bucket_origin OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE list_table_area OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE range_table_logdate OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE list_table_country OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE range_table_logdate_split OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/partition_data.csv' INTO TABLE range_table_bucket OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + } + + test("Alter table add partition: List Partition") { + sql("""ALTER TABLE list_table_area ADD PARTITION ('OutSpace', 'Hi')""".stripMargin) + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area") + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionIds = partitionInfo.getPartitionIds + val list_info = partitionInfo.getListInfo + assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava) + assert(partitionInfo.getMAX_PARTITION == 5) + assert(partitionInfo.getNumPartitions == 6) + assert(list_info.get(0).get(0) == "Asia") + assert(list_info.get(1).get(0) == "America") + assert(list_info.get(2).get(0) == "Europe") + assert(list_info.get(3).get(0) == "OutSpace") + assert(list_info.get(4).get(0) == "Hi") + validateDataFiles("default_list_table_area", "0", Seq(0, 1, 2, 4)) + val result_after = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area") + val result_origin = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin") + checkAnswer(result_after, result_origin) + + val result_after1 = sql(s"select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area < 'OutSpace' ") + val rssult_origin1 = sql(s"select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area < 'OutSpace' ") + checkAnswer(result_after1, rssult_origin1) + + val result_after2 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area <= 'OutSpace' ") + val result_origin2 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area <= 'OutSpace' ") + checkAnswer(result_after2, result_origin2) + + val result_after3 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area = 'OutSpace' ") + val result_origin3 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area = 'OutSpace' ") + checkAnswer(result_after3, result_origin3) + + val result_after4 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area > 'OutSpace' ") + val result_origin4 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area > 'OutSpace' ") + checkAnswer(result_after4, result_origin4) + + val result_after5 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area >= 'OutSpace' ") + val result_origin5 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area >= 'OutSpace' ") + checkAnswer(result_after5, result_origin5) + + sql("""ALTER TABLE list_table_area ADD PARTITION ('One', '(Two, Three)', 'Four')""".stripMargin) + val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area") + val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName) + val partitionIds1 = partitionInfo1.getPartitionIds + val new_list_info = partitionInfo1.getListInfo + assert(partitionIds1 == List(0, 1, 2, 3, 4, 5, 6, 7, 8).map(Integer.valueOf(_)).asJava) + assert(partitionInfo1.getMAX_PARTITION == 8) + assert(partitionInfo1.getNumPartitions == 9) + assert(new_list_info.get(0).get(0) == "Asia") + assert(new_list_info.get(1).get(0) == "America") + assert(new_list_info.get(2).get(0) == "Europe") + assert(new_list_info.get(3).get(0) == "OutSpace") + assert(new_list_info.get(4).get(0) == "Hi") + assert(new_list_info.get(5).get(0) == "One") + assert(new_list_info.get(6).get(0) == "Two") + assert(new_list_info.get(6).get(1) == "Three") + assert(new_list_info.get(7).get(0) == "Four") + validateDataFiles("default_list_table_area", "0", Seq(0, 1, 2, 4)) + + val result_after6 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area") + val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin""") + checkAnswer(result_after6, result_origin6) + } + + test("Alter table add partition: Range Partition") { + sql("""ALTER TABLE range_table_logdate ADD PARTITION ('2017/01/01', '2018/01/01')""") + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate") + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionIds = partitionInfo.getPartitionIds + val range_info = partitionInfo.getRangeInfo + assert(partitionIds.size() == 6) + assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava) + assert(partitionInfo.getMAX_PARTITION == 5) + assert(range_info.get(0) == "2014/01/01") + assert(range_info.get(1) == "2015/01/01") + assert(range_info.get(2) == "2016/01/01") + assert(range_info.get(3) == "2017/01/01") + assert(range_info.get(4) == "2018/01/01") + validateDataFiles("default_range_table_logdate", "0", Seq(1, 2, 3, 4, 5)) + val result_after = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate""") + val result_origin = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin""") + checkAnswer(result_after, result_origin) + + val result_after1 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate where logdate < cast('2017/01/12 00:00:00' as timestamp) """) + val result_origin1 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate < cast('2017/01/12 00:00:00' as timestamp) """) + checkAnswer(result_after1, result_origin1) + + val result_after2 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate where logdate <= cast('2017/01/12 00:00:00' as timestamp) """) + val result_origin2 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate <= cast('2017/01/12 00:00:00' as timestamp) """) + checkAnswer(result_after2, result_origin2) + + val result_after3 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate where logdate = cast('2017/01/12 00:00:00' as timestamp) """) + val result_origin3 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate = cast('2017/01/12 00:00:00' as timestamp) """) + checkAnswer(result_after3, result_origin3) + + val result_after4 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate where logdate >= cast('2017/01/12 00:00:00' as timestamp) """) + val result_origin4 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate >= cast('2017/01/12 00:00:00' as timestamp) """) + checkAnswer(result_after4, result_origin4) + + val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate where logdate > cast('2017/01/12 00:00:00' as timestamp) """) + val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """) + checkAnswer(result_after5, result_origin5) + } + + test("Alter table split partition: List Partition") { + sql("""ALTER TABLE list_table_country SPLIT PARTITION(4) INTO ('Canada', 'Russia', '(Good, NotGood)')""".stripMargin) + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country") + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionIds = partitionInfo.getPartitionIds + val list_info = partitionInfo.getListInfo + assert(partitionIds == List(0, 1, 2, 3, 6, 7, 8, 5).map(Integer.valueOf(_)).asJava) + assert(partitionInfo.getMAX_PARTITION == 8) + assert(partitionInfo.getNumPartitions == 8) + assert(list_info.get(0).get(0) == "China") + assert(list_info.get(0).get(1) == "US") + assert(list_info.get(1).get(0) == "UK") + assert(list_info.get(2).get(0) == "Japan") + assert(list_info.get(3).get(0) == "Canada") + assert(list_info.get(4).get(0) == "Russia") + assert(list_info.get(5).get(0) == "Good") + assert(list_info.get(5).get(1) == "NotGood") + assert(list_info.get(6).get(0) == "Korea") + validateDataFiles("default_list_table_country", "0", Seq(0, 1, 2, 3, 8)) + val result_after = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country""") + val result_origin = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin""") + checkAnswer(result_after, result_origin) + + val result_after1 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country where country < 'NotGood' """) + val result_origin1 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin where country < 'NotGood' """) + checkAnswer(result_after1, result_origin1) + + val result_after2 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country where country <= 'NotGood' """) + val result_origin2 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin where country <= 'NotGood' """) + checkAnswer(result_after2, result_origin2) + + val result_after3 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country where country = 'NotGood' """) + val result_origin3 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin where country = 'NotGood' """) + checkAnswer(result_after3, result_origin3) + + val result_after4 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country where country >= 'NotGood' """) + val result_origin4 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin where country >= 'NotGood' """) + checkAnswer(result_after4, result_origin4) + + val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country where country > 'NotGood' """) + val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin where country > 'NotGood' """) + checkAnswer(result_after5, result_origin5) + } + + test("Alter table split partition: Range Partition") { + sql("""ALTER TABLE range_table_logdate_split SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""") + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split") + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionIds = partitionInfo.getPartitionIds + val rangeInfo = partitionInfo.getRangeInfo + assert(partitionIds == List(0, 1, 2, 3, 5, 6).map(Integer.valueOf(_)).asJava) + assert(partitionInfo.getMAX_PARTITION == 6) + assert(partitionInfo.getNumPartitions == 6) + assert(rangeInfo.get(0) == "2014/01/01") + assert(rangeInfo.get(1) == "2015/01/01") + assert(rangeInfo.get(2) == "2016/01/01") + assert(rangeInfo.get(3) == "2017/01/01") + assert(rangeInfo.get(4) == "2018/01/01") + validateDataFiles("default_range_table_logdate_split", "0", Seq(1, 2, 3, 5, 6)) + val result_after = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split""") + val result_origin = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin""") + checkAnswer(result_after, result_origin) + + val result_after1 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split where logdate < cast('2017/01/12 00:00:00' as timestamp) """) + val result_origin1 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin where logdate < cast('2017/01/12 00:00:00' as timestamp) """) + checkAnswer(result_after1, result_origin1) + + val result_after2 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split where logdate <= cast('2017/01/12 00:00:00' as timestamp) """) + val result_origin2 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin where logdate <= cast('2017/01/12 00:00:00' as timestamp) """) + checkAnswer(result_after2, result_origin2) + + val result_after3 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split where logdate = cast('2017/01/12 00:00:00' as timestamp) """) + val result_origin3 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin where logdate = cast('2017/01/12 00:00:00' as timestamp) """) + checkAnswer(result_after3, result_origin3) + + val result_after4 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split where logdate >= cast('2017/01/12 00:00:00' as timestamp) """) + val result_origin4 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin where logdate >= cast('2017/01/12 00:00:00' as timestamp) """) + checkAnswer(result_after4, result_origin4) + + val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split where logdate > cast('2017/01/12 00:00:00' as timestamp) """) + val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """) + checkAnswer(result_after5, result_origin5) + } + + test("Alter table split partition: Range Partition + Bucket") { + sql("""ALTER TABLE range_table_bucket SPLIT PARTITION(4) INTO ('2017/01/01', '2018/01/01')""") + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket") + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) + val partitionIds = partitionInfo.getPartitionIds + val rangeInfo = partitionInfo.getRangeInfo + assert(partitionIds == List(0, 1, 2, 3, 5, 6).map(Integer.valueOf(_)).asJava) + assert(partitionInfo.getMAX_PARTITION == 6) + assert(partitionInfo.getNumPartitions == 6) + assert(rangeInfo.get(0) == "2014/01/01") + assert(rangeInfo.get(1) == "2015/01/01") + assert(rangeInfo.get(2) == "2016/01/01") + assert(rangeInfo.get(3) == "2017/01/01") + assert(rangeInfo.get(4) == "2018/01/01") + validateDataFiles("default_range_table_bucket", "0", Seq(1, 2, 3, 5, 6)) + val result_after = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket""") + val result_origin = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin""") + checkAnswer(result_after, result_origin) + + val result_after1 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket where logdate < cast('2017/01/12 00:00:00' as timestamp) """) + val result_origin1 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate < cast('2017/01/12 00:00:00' as timestamp) """) + checkAnswer(result_after1, result_origin1) + + val result_after2 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket where logdate <= cast('2017/01/12 00:00:00' as timestamp) """) + val result_origin2 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate <= cast('2017/01/12 00:00:00' as timestamp) """) + checkAnswer(result_after2, result_origin2) + + val result_origin3 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket where logdate = cast('2017/01/12 00:00:00' as timestamp) """) + val result_after3 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate = cast('2017/01/12 00:00:00' as timestamp) """) + checkAnswer(result_origin3, result_after3) + + val result_after4 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket where logdate >= cast('2017/01/12 00:00:00' as timestamp) """) + val result_origin4 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate >= cast('2017/01/12 00:00:00' as timestamp) """) + checkAnswer(result_after4, result_origin4) + + val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket where logdate > cast('2017/01/12 00:00:00' as timestamp) """) + val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """) + checkAnswer(result_after5, result_origin5) + } + + def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Seq[Int]): Unit = { + val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) + val dataFiles = getDataFiles(carbonTable, segmentId) + validatePartitionTableFiles(partitions, dataFiles) + } + + def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = { + val tablePath = new CarbonTablePath(carbonTable.getStorePath, carbonTable.getDatabaseName, + carbonTable.getFactTableName) + val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId) + val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) + val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { + override def accept(file: CarbonFile): Boolean = { + return file.getName.endsWith(".carbondata") + } + }) + dataFiles + } + + /** + * should ensure answer equals to expected list, not only contains + * @param partitions + * @param dataFiles + */ + def validatePartitionTableFiles(partitions: Seq[Int], dataFiles: Array[CarbonFile]): Unit = { + val partitionIds: ListBuffer[Int] = new ListBuffer[Int]() + dataFiles.foreach { dataFile => + val partitionId = CarbonTablePath.DataFileUtil.getTaskNo(dataFile.getName).split("_")(0).toInt + partitionIds += partitionId + assert(partitions.contains(partitionId)) + } + partitions.foreach(id => assert(partitionIds.contains(id))) + } + + override def afterAll = { + dropTable + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + } + + def dropTable { + sql("DROP TABLE IF EXISTS list_table_area_origin") + sql("DROP TABLE IF EXISTS range_table_logdate_origin") + sql("DROP TABLE IF EXISTS list_table_country_origin") + sql("DROP TABLE IF EXISTS range_table_logdate_split_origin") + sql("DROP TABLE IF EXISTS range_table_bucket_origin") + sql("DROP TABLE IF EXISTS list_table_area") + sql("DROP TABLE IF EXISTS range_table_logdate") + sql("DROP TABLE IF EXISTS list_table_country") + sql("DROP TABLE IF EXISTS range_table_logdate_split") + sql("DROP TABLE IF EXISTS range_table_bucket") + } + + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index 56aaf54..095e5a3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -402,7 +402,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { private void initTempStoreLocation() { tempStoreLocation = CarbonDataProcessorUtil .getLocalDataFolderLocation(carbonLoadModel.getDatabaseName(), tableName, - carbonLoadModel.getTaskNo(), carbonLoadModel.getPartitionId(), segmentId, true); + carbonLoadModel.getTaskNo(), carbonLoadModel.getPartitionId(), segmentId, + true, false); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java index 518d64b..ccb25e6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java @@ -143,7 +143,7 @@ public final class DataLoadProcessBuilder { String tableName = loadModel.getTableName(); String tempLocationKey = CarbonDataProcessorUtil .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(), - loadModel.getTaskNo(), false); + loadModel.getTaskNo(), false, false); CarbonProperties.getInstance().addProperty(tempLocationKey, StringUtils.join(storeLocation, File.pathSeparator)); CarbonProperties.getInstance() http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java index 1aa06f6..c3cf3c0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java @@ -72,7 +72,7 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter { CarbonDataProcessorUtil.getLocalDataFolderLocation( sortParameters.getDatabaseName(), sortParameters.getTableName(), String.valueOf(sortParameters.getTaskNo()), sortParameters.getPartitionID(), - sortParameters.getSegmentId() + "", false); + sortParameters.getSegmentId() + "", false, false); // Set the data file location String[] dataFolderLocations = CarbonDataProcessorUtil.arrayAppend(storeLocations, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java index 7314b1e..851c384 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java @@ -137,7 +137,7 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte String[] storeLocation = CarbonDataProcessorUtil .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(), String.valueOf(sortParameters.getTaskNo()), bucketId, - sortParameters.getSegmentId() + "", false); + sortParameters.getSegmentId() + "", false, false); // Set the data file location String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); @@ -188,7 +188,7 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte String[] carbonDataDirectoryPath = CarbonDataProcessorUtil .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(), - parameters.getPartitionID(), parameters.getSegmentId(), false); + parameters.getPartitionID(), parameters.getSegmentId(), false, false); String[] tmpLocs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); parameters.setTempFileLocation(tmpLocs); http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java index e140d86..ebb85b4 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java @@ -224,7 +224,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter String[] carbonDataDirectoryPath = CarbonDataProcessorUtil .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(), batchCount + "", - parameters.getSegmentId(), false); + parameters.getSegmentId(), false, false); String[] tempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); parameters.setTempFileLocation(tempDirs); http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java index e508654..f000619 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java @@ -127,7 +127,7 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter { String[] storeLocation = CarbonDataProcessorUtil .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(), String.valueOf(sortParameters.getTaskNo()), bucketId, - sortParameters.getSegmentId() + "", false); + sortParameters.getSegmentId() + "", false, false); // Set the data file location String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); @@ -170,7 +170,8 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter { private void setTempLocation(SortParameters parameters) { String[] carbonDataDirectoryPath = CarbonDataProcessorUtil .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(), - parameters.getTaskNo(), parameters.getPartitionID(), parameters.getSegmentId(), false); + parameters.getTaskNo(), parameters.getPartitionID(), parameters.getSegmentId(), + false, false); String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); parameters.setTempFileLocation(tmpLoc); http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java index fe9257f..765e0ed 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java @@ -91,7 +91,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces String[] storeLocation = CarbonDataProcessorUtil .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId, - configuration.getSegmentId() + "", false); + configuration.getSegmentId() + "", false, false); CarbonDataProcessorUtil.createLocations(storeLocation); return storeLocation; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java index c5f2479..fc4d4d2 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterBatchProcessorStepImpl.java @@ -62,7 +62,7 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS String[] storeLocation = CarbonDataProcessorUtil .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId, - configuration.getSegmentId() + "", false); + configuration.getSegmentId() + "", false, false); CarbonDataProcessorUtil.createLocations(storeLocation); return storeLocation; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java index aad874b..c7af420 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java @@ -69,7 +69,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { String[] storeLocation = CarbonDataProcessorUtil .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId, - configuration.getSegmentId() + "", false); + configuration.getSegmentId() + "", false, false); CarbonDataProcessorUtil.createLocations(storeLocation); return storeLocation; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java index 16cab07..fb2977e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java @@ -412,7 +412,7 @@ public class SortParameters implements Serializable { String[] carbonDataDirectoryPath = CarbonDataProcessorUtil .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), configuration.getTaskNo(), - configuration.getPartitionId(), configuration.getSegmentId(), false); + configuration.getPartitionId(), configuration.getSegmentId(), false, false); String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); @@ -534,7 +534,7 @@ public class SortParameters implements Serializable { String[] carbonDataDirectoryPath = CarbonDataProcessorUtil .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID, segmentId, - isCompactionFlow); + isCompactionFlow, false); String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); parameters.setTempFileLocation(sortTempDirs); http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/spliter/AbstractCarbonQueryExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/AbstractCarbonQueryExecutor.java new file mode 100644 index 0000000..39d1234 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/spliter/AbstractCarbonQueryExecutor.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.spliter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.block.TaskBlockInfo; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.scan.executor.QueryExecutor; +import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; +import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; +import org.apache.carbondata.core.scan.model.QueryDimension; +import org.apache.carbondata.core.scan.model.QueryMeasure; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.BatchResult; +import org.apache.carbondata.core.util.CarbonUtil; + +public abstract class AbstractCarbonQueryExecutor { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(AbstractCarbonQueryExecutor.class.getName()); + protected CarbonTable carbonTable; + protected QueryModel queryModel; + protected QueryExecutor queryExecutor; + protected Map<String, TaskBlockInfo> segmentMapping; + + /** + * get executor and execute the query model. + * + * @param blockList + * @return + */ + protected CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList) + throws QueryExecutionException, IOException { + queryModel.setTableBlockInfos(blockList); + this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); + return queryExecutor.execute(queryModel); + } + + /** + * Preparing of the query model. + * + * @param blockList + * @return + */ + protected QueryModel prepareQueryModel(List<TableBlockInfo> blockList) { + QueryModel model = new QueryModel(); + model.setTableBlockInfos(blockList); + model.setForcedDetailRawQuery(true); + model.setFilterExpressionResolverTree(null); + + List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + List<CarbonDimension> dimensions = + carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); + for (CarbonDimension dim : dimensions) { + // check if dimension is deleted + QueryDimension queryDimension = new QueryDimension(dim.getColName()); + queryDimension.setDimension(dim); + dims.add(queryDimension); + } + model.setQueryDimension(dims); + + List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + List<CarbonMeasure> measures = + carbonTable.getMeasureByTableName(carbonTable.getFactTableName()); + for (CarbonMeasure carbonMeasure : measures) { + // check if measure is deleted + QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName()); + queryMeasure.setMeasure(carbonMeasure); + msrs.add(queryMeasure); + } + model.setQueryMeasures(msrs); + model.setQueryId(System.nanoTime() + ""); + model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier()); + model.setTable(carbonTable); + return model; + } + + /** + * Below method will be used + * for cleanup + */ + public void finish() { + try { + queryExecutor.finish(); + } catch (QueryExecutionException e) { + LOGGER.error(e, "Problem while finish: "); + } + clearDictionaryFromQueryModel(); + } + + /** + * This method will clear the dictionary access count after its usage is complete so + * that column can be deleted form LRU cache whenever memory reaches threshold + */ + private void clearDictionaryFromQueryModel() { + if (null != queryModel) { + Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping(); + if (null != columnToDictionaryMapping) { + for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) { + CarbonUtil.clearDictionaryCache(entry.getValue()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/spliter/CarbonSplitExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/CarbonSplitExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/CarbonSplitExecutor.java new file mode 100644 index 0000000..7b724ee --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/spliter/CarbonSplitExecutor.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.spliter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.block.TaskBlockInfo; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; +import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator; + +/** + * Used to read carbon blocks when add/split partition + */ +public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonSplitExecutor.class.getName()); + + public CarbonSplitExecutor(Map<String, TaskBlockInfo> segmentMapping, CarbonTable carbonTable) { + this.segmentMapping = segmentMapping; + this.carbonTable = carbonTable; + } + + public List<PartitionSpliterRawResultIterator> processDataBlocks(String segmentId) + throws QueryExecutionException, IOException { + List<TableBlockInfo> list = null; + queryModel = prepareQueryModel(list); + List<PartitionSpliterRawResultIterator> resultList + = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + TaskBlockInfo taskBlockInfo = segmentMapping.get(segmentId); + Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet(); + for (String task : taskBlockListMapping) { + list = taskBlockInfo.getTableBlockInfoList(task); + LOGGER.info("for task -" + task + "-block size is -" + list.size()); + queryModel.setTableBlockInfos(list); + resultList.add(new PartitionSpliterRawResultIterator(executeBlockList(list))); + } + return resultList; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java new file mode 100644 index 0000000..ea38a53 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.spliter; + +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.processing.model.CarbonLoadModel; +import org.apache.carbondata.processing.spliter.exception.SliceSpliterException; +import org.apache.carbondata.processing.store.CarbonDataFileAttributes; +import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar; +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; +import org.apache.carbondata.processing.store.CarbonFactHandler; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +public class RowResultSpliterProcessor { + + private CarbonFactHandler dataHandler; + private SegmentProperties segmentProperties; + + private static final LogService LOGGER = + LogServiceFactory.getLogService(RowResultSpliterProcessor.class.getName()); + + + public RowResultSpliterProcessor(CarbonTable carbonTable, CarbonLoadModel loadModel, + SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) { + CarbonDataProcessorUtil.createLocations(tempStoreLocation); + this.segmentProperties = segProp; + String tableName = carbonTable.getFactTableName(); + CarbonFactDataHandlerModel carbonFactDataHandlerModel = + CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable, + segProp, tableName, tempStoreLocation); + CarbonDataFileAttributes carbonDataFileAttributes = + new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()), + loadModel.getFactTimeStamp()); + carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes); + carbonFactDataHandlerModel.setBucketId(bucketId); + //Note: set compaction flow just to convert decimal type + carbonFactDataHandlerModel.setCompactionFlow(true); + dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel); + } + + public boolean execute(List<Object[]> resultList) { + boolean splitStatus; + boolean isDataPresent = false; + + try { + if (!isDataPresent) { + dataHandler.initialise(); + isDataPresent = true; + } + for (Object[] row: resultList) { + addRow(row); + } + if (isDataPresent) + { + this.dataHandler.finish(); + } + splitStatus = true; + } catch (SliceSpliterException e) { + LOGGER.error(e, e.getMessage()); + LOGGER.error("Exception in split partition" + e.getMessage()); + splitStatus = false; + } finally { + try { + if (isDataPresent) { + this.dataHandler.closeHandler(); + } + } catch (Exception e) { + LOGGER.error("Exception while closing the handler in partition spliter" + e.getMessage()); + splitStatus = false; + } + } + return splitStatus; + } + + private void addRow(Object[] carbonTuple) throws SliceSpliterException { + CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties); + try { + this.dataHandler.addDataToStore(row); + } catch (CarbonDataWriterException e) { + throw new SliceSpliterException("Problem in writing rows when add/split the partition", e); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java new file mode 100644 index 0000000..17e679a --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.spliter.exception; + +import java.util.Locale; + +public class SliceSpliterException extends Exception { + + /** + * default serial version ID. + */ + private static final long serialVersionUID = 1L; + + /** + * The Error message. + */ + private String msg = ""; + + /** + * Constructor + * + * @param msg The error message for this exception. + */ + public SliceSpliterException(String msg) { + super(msg); + this.msg = msg; + } + + /** + * Constructor + * + * @param msg The error message for this exception. + */ + public SliceSpliterException(String msg, Throwable t) { + super(msg, t); + this.msg = msg; + } + + /** + * This method is used to get the localized message. + * + * @param locale - A Locale object represents a specific geographical, + * political, or cultural region. + * @return - Localized error message. + */ + public String getLocalizedMessage(Locale locale) { + return ""; + } + + /** + * getLocalizedMessage + */ + @Override public String getLocalizedMessage() { + return super.getLocalizedMessage(); + } + + /** + * getMessage + */ + public String getMessage() { + return this.msg; + } +}
