http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala index fc7c13a..22dab27 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala @@ -28,6 +28,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.exception.InvalidConfigurationException import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.util.CarbonUtil @@ -98,6 +99,18 @@ case class CarbonCreateTableCommand( val partitionString = if (partitionInfo != null && partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) { + // Restrict dictionary encoding on partition columns. + // TODO Need to decide wherher it is required + val dictionaryOnPartitionColumn = + partitionInfo.getColumnSchemaList.asScala.exists{p => + p.hasEncoding(Encoding.DICTIONARY) && !p.hasEncoding(Encoding.DIRECT_DICTIONARY) + } + if (dictionaryOnPartitionColumn) { + throwMetadataException( + dbName, + tableName, + s"Dictionary include cannot be applied on partition columns") + } s" PARTITIONED BY (${partitionInfo.getColumnSchemaList.asScala.map( _.getColumnName).mkString(",")})" } else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index d2c691b..bff65be 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources import java.io.File -import java.text.SimpleDateFormat import java.util import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong @@ -39,16 +38,18 @@ import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types._ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} -import org.apache.carbondata.core.metadata.PartitionMapFileStore +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.indexstore +import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.encoder.Encoding -import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, DataTypeConverterImpl, DataTypeUtil} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat} import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter import org.apache.carbondata.hadoop.internal.ObjectArrayWritable import org.apache.carbondata.hadoop.util.ObjectSerializationUtil -import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util} @@ -66,6 +67,10 @@ with Serializable { None } + SparkSession.getActiveSession.get.sessionState.conf.setConfString( + "spark.sql.sources.commitProtocolClass", + "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol") + override def prepareWrite( sparkSession: SparkSession, job: Job, @@ -77,9 +82,6 @@ with Serializable { classOf[CarbonOutputCommitter], classOf[CarbonOutputCommitter]) conf.set("carbon.commit.protocol", "carbon.commit.protocol") - sparkSession.sessionState.conf.setConfString( - "spark.sql.sources.commitProtocolClass", - "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol") job.setOutputFormatClass(classOf[CarbonTableOutputFormat]) val table = CarbonEnv.getCarbonTable( TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession) @@ -114,13 +116,7 @@ with Serializable { model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt) CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean) model.setPartitionLoad(true) - // Set the update timestamp if user sets in case of update query. It needs to be updated - // in load status update time - val updateTimeStamp = options.get("updatetimestamp") - if (updateTimeStamp.isDefined) { - conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get) - model.setFactTimeStamp(updateTimeStamp.get.toLong) - } + val staticPartition = options.getOrElse("staticpartition", null) if (staticPartition != null) { conf.set("carbon.staticpartition", staticPartition) @@ -131,6 +127,30 @@ with Serializable { if (segemntsTobeDeleted.isDefined) { conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, segemntsTobeDeleted.get) } + + val currPartition = options.getOrElse("currentpartition", null) + if (currPartition != null) { + conf.set("carbon.currentpartition", currPartition) + } + // Update with the current in progress load. + val currEntry = options.getOrElse("currentloadentry", null) + if (currEntry != null) { + val loadEntry = + ObjectSerializationUtil.convertStringToObject(currEntry).asInstanceOf[LoadMetadataDetails] + val details = + SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath)) + model.setSegmentId(loadEntry.getLoadName) + model.setFactTimeStamp(loadEntry.getLoadStartTime) + val list = new util.ArrayList[LoadMetadataDetails](details.toList.asJava) + list.add(loadEntry) + model.setLoadMetadataDetails(list) + } + // Set the update timestamp if user sets in case of update query. It needs to be updated + // in load status update time + val updateTimeStamp = options.get("updatetimestamp") + if (updateTimeStamp.isDefined) { + conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get) + } CarbonTableOutputFormat.setLoadModel(conf, model) new OutputWriterFactory { @@ -146,13 +166,14 @@ with Serializable { path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { + val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration) val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir var storeLocation: Array[String] = Array[String]() val isCarbonUseLocalDir = CarbonProperties.getInstance() .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true") - val taskNumber = generateTaskNumber(path, context) + val taskNumber = generateTaskNumber(path, context, model.getSegmentId) val tmpLocationSuffix = File.separator + "carbon" + System.nanoTime() + File.separator + taskNumber if (isCarbonUseLocalDir) { @@ -174,7 +195,7 @@ with Serializable { storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix) } CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation) - new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), taskNumber) + new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), taskNumber, model) } /** @@ -182,7 +203,7 @@ with Serializable { * of partition tables. */ private def generateTaskNumber(path: String, - context: TaskAttemptContext): String = { + context: TaskAttemptContext, segmentId: String): String = { var partitionNumber: java.lang.Long = taskIdMap.get(path) if (partitionNumber == null) { partitionNumber = counter.incrementAndGet() @@ -190,8 +211,7 @@ with Serializable { taskIdMap.put(path, partitionNumber) } val taskID = context.getTaskAttemptID.getTaskID.getId - String.valueOf(Math.pow(10, 5).toInt + taskID) + - String.valueOf(partitionNumber + Math.pow(10, 5).toInt) + CarbonScalaUtil.generateUniqueNumber(taskID, segmentId, partitionNumber) } override def getFileExtension(context: TaskAttemptContext): String = { @@ -233,9 +253,12 @@ private trait AbstractCarbonOutputWriter { private class CarbonOutputWriter(path: String, context: TaskAttemptContext, fieldTypes: Seq[DataType], - taskNo : String) + taskNo : String, + model: CarbonLoadModel) extends OutputWriter with AbstractCarbonOutputWriter { - val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration) + + val converter = new DataTypeConverterImpl + val partitions = getPartitionsFromPath(path, context, model).map(ExternalCatalogUtils.unescapePathName) val staticPartition: util.HashMap[String, Boolean] = { @@ -247,47 +270,52 @@ private class CarbonOutputWriter(path: String, null } } - lazy val (updatedPartitions, partitionData) = if (partitions.nonEmpty) { - val updatedPartitions = partitions.map{ p => - val value = p.substring(p.indexOf("=") + 1, p.length) - val col = p.substring(0, p.indexOf("=")) - // NUll handling case. For null hive creates with this special name - if (value.equals("__HIVE_DEFAULT_PARTITION__")) { - (col, null) - // we should replace back the special string with empty value. - } else if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) { - (col, "") - } else { - (col, value) - } - } - - if (staticPartition != null) { - val loadModel = recordWriter.getLoadModel - val table = loadModel.getCarbonDataLoadSchema.getCarbonTable - var timeStampformatString = loadModel.getTimestampformat - if (timeStampformatString.isEmpty) { - timeStampformatString = loadModel.getDefaultTimestampFormat - } - val timeFormat = new SimpleDateFormat(timeStampformatString) - var dateFormatString = loadModel.getDateFormat - if (dateFormatString.isEmpty) { - dateFormatString = loadModel.getDefaultDateFormat - } - val dateFormat = new SimpleDateFormat(dateFormatString) - val formattedPartitions = updatedPartitions.map {case (col, value) => - // Only convert the static partitions to the carbon format and use it while loading data - // to carbon. - (col, value) - } - (formattedPartitions, updatePartitions(formattedPartitions.map(_._2))) + lazy val currPartitions: util.List[indexstore.PartitionSpec] = { + val currParts = context.getConfiguration.get("carbon.currentpartition") + if (currParts != null) { + ObjectSerializationUtil.convertStringToObject( + currParts).asInstanceOf[util.List[indexstore.PartitionSpec]] } else { - (updatedPartitions, updatePartitions(updatedPartitions.map(_._2))) + new util.ArrayList[indexstore.PartitionSpec]() } + } + var (updatedPartitions, partitionData) = if (partitions.nonEmpty) { + val updatedPartitions = partitions.map(splitPartition) + (updatedPartitions, updatePartitions(updatedPartitions.map(_._2))) } else { (Map.empty[String, String].toArray, Array.empty) } + private def splitPartition(p: String) = { + val value = p.substring(p.indexOf("=") + 1, p.length) + val col = p.substring(0, p.indexOf("=")) + // NUll handling case. For null hive creates with this special name + if (value.equals("__HIVE_DEFAULT_PARTITION__")) { + (col, null) + // we should replace back the special string with empty value. + } else if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) { + (col, "") + } else { + (col, value) + } + } + + lazy val writePath = { + val updatedPath = getPartitionPath(path, context, model) + // in case of partition location specified by user then search the partitions from the current + // partitions to get the corresponding partitions. + if (partitions.isEmpty) { + val writeSpec = new indexstore.PartitionSpec(null, updatedPath) + val index = currPartitions.indexOf(writeSpec) + if (index > -1) { + val spec = currPartitions.get(index) + updatedPartitions = spec.getPartitions.asScala.map(splitPartition).toArray + partitionData = updatePartitions(updatedPartitions.map(_._2)) + } + } + updatedPath + } + val writable = new ObjectArrayWritable private def updatePartitions(partitionData: Seq[String]): Array[AnyRef] = { @@ -302,21 +330,30 @@ private class CarbonOutputWriter(path: String, } else { col.getDataType } - if (staticPartition != null) { - DataTypeUtil.getDataBasedOnDataType( + if (staticPartition != null && staticPartition.get(col.getColumnName.toLowerCase)) { + val converetedVal = CarbonScalaUtil.convertStaticPartitions( partitionData(index), col, - model.getCarbonDataLoadSchema.getCarbonTable), - dataType) + model.getCarbonDataLoadSchema.getCarbonTable) + if (col.hasEncoding(Encoding.DICTIONARY)) { + converetedVal.toInt.asInstanceOf[AnyRef] + } else { + DataTypeUtil.getDataBasedOnDataType( + converetedVal, + dataType, + converter) + } } else { - DataTypeUtil.getDataBasedOnDataType(partitionData(index), dataType) + DataTypeUtil.getDataBasedOnDataType(partitionData(index), dataType, converter) } }.toArray } private val recordWriter: CarbonRecordWriter = { context.getConfiguration.set("carbon.outputformat.taskno", taskNo) + context.getConfiguration.set("carbon.outputformat.writepath", + writePath + "/" + model.getSegmentId + "_" + model.getFactTimeStamp + ".tmp") new CarbonTableOutputFormat() { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { new Path(path) @@ -355,51 +392,54 @@ private class CarbonOutputWriter(path: String, override def close(): Unit = { recordWriter.close(context) - val loadModel = recordWriter.getLoadModel - val segmentPath = CarbonTablePath.getSegmentPath(loadModel.getTablePath, loadModel.getSegmentId) - val table = loadModel.getCarbonDataLoadSchema.getCarbonTable - var timeStampformatString = loadModel.getTimestampformat - if (timeStampformatString.isEmpty) { - timeStampformatString = loadModel.getDefaultTimestampFormat - } - val timeFormat = new SimpleDateFormat(timeStampformatString) - var dateFormatString = loadModel.getDateFormat - if (dateFormatString.isEmpty) { - dateFormatString = loadModel.getDefaultDateFormat - } - val dateFormat = new SimpleDateFormat(dateFormatString) - val serializeFormat = - loadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) - val badRecordAction = loadModel.getBadRecordsAction.split(",")(1) - val isEmptyBadRecord = loadModel.getIsEmptyDataBadRecord.split(",")(1).toBoolean // write partition info to new file. val partitonList = new util.ArrayList[String]() val formattedPartitions = // All dynamic partitions need to be converted to proper format CarbonScalaUtil.updatePartitions( updatedPartitions.toMap, - table, - timeFormat, - dateFormat) + model.getCarbonDataLoadSchema.getCarbonTable) formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2)) - new PartitionMapFileStore().writePartitionMapFile( - segmentPath, - loadModel.getTaskNo, + SegmentFileStore.writeSegmentFile( + model.getTablePath, + taskNo, + writePath, + model.getSegmentId + "_" + model.getFactTimeStamp + "", partitonList) } + def getPartitionPath(path: String, + attemptContext: TaskAttemptContext, + model: CarbonLoadModel): String = { + if (updatedPartitions.nonEmpty) { + val formattedPartitions = + // All dynamic partitions need to be converted to proper format + CarbonScalaUtil.updatePartitions( + updatedPartitions.toMap, + model.getCarbonDataLoadSchema.getCarbonTable) + val partitionstr = formattedPartitions.map{p => + ExternalCatalogUtils.escapePathName(p._1) + "=" + ExternalCatalogUtils.escapePathName(p._2) + }.mkString(CarbonCommonConstants.FILE_SEPARATOR) + model.getCarbonDataLoadSchema.getCarbonTable.getTablePath + + CarbonCommonConstants.FILE_SEPARATOR + partitionstr + } else { + var updatedPath = FileFactory.getUpdatedFilePath(path) + updatedPath.substring(0, updatedPath.lastIndexOf("/")) + } + } + def getPartitionsFromPath( path: String, attemptContext: TaskAttemptContext, model: CarbonLoadModel): Array[String] = { var attemptId = attemptContext.getTaskAttemptID.toString + "/" - if (path.indexOf(attemptId) <= 0) { - - attemptId = model.getTableName + "/" - } - val str = path.substring(path.indexOf(attemptId) + attemptId.length, path.lastIndexOf("/")) - if (str.length > 0) { - str.split("/") + if (path.indexOf(attemptId) > -1) { + val str = path.substring(path.indexOf(attemptId) + attemptId.length, path.lastIndexOf("/")) + if (str.length > 0) { + str.split("/") + } else { + Array.empty + } } else { Array.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala index 544c494..48679b1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast} import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.schema.BucketingInfo import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager @@ -143,10 +144,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { projects: Seq[NamedExpression], filterPredicates: Seq[Expression], scanBuilder: (Seq[Attribute], Array[Filter], - ArrayBuffer[AttributeReference], Seq[String]) => RDD[InternalRow]) = { + ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow]) = { val names = relation.catalogTable.get.partitionColumnNames // Get the current partitions from table. - var partitions: Seq[String] = null + var partitions: Seq[PartitionSpec] = null if (names.nonEmpty) { val partitionSet = AttributeSet(names .map(p => relation.output.find(_.name.equalsIgnoreCase(p)).get)) @@ -167,7 +168,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { CarbonFilters.getPartitions( updatedPartitionFilters.toSeq, SparkSession.getActiveSession.get, - relation.catalogTable.get.identifier) + relation.catalogTable.get.identifier).orNull } pruneFilterProjectRaw( relation, @@ -199,9 +200,9 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { relation: LogicalRelation, rawProjects: Seq[NamedExpression], filterPredicates: Seq[Expression], - partitions: Seq[String], + partitions: Seq[PartitionSpec], scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter], - ArrayBuffer[AttributeReference], Seq[String]) => RDD[InternalRow]) = { + ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow]) = { val projects = rawProjects.map {p => p.transform { case CustomDeterministicExpression(exp) => exp @@ -350,9 +351,9 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { private def getDataSourceScan(relation: LogicalRelation, output: Seq[Attribute], - partitions: Seq[String], + partitions: Seq[PartitionSpec], scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter], - ArrayBuffer[AttributeReference], Seq[String]) => RDD[InternalRow], + ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow], candidatePredicates: Seq[Expression], pushedFilters: Seq[Filter], metadata: Map[String, String], http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 0178716..f69ccc1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand, CarbonLoadDataCommand, RefreshCarbonTableCommand} -import org.apache.spark.sql.execution.command.partition.{CarbonAlterTableDropHivePartitionCommand, CarbonShowCarbonPartitionsCommand} +import org.apache.spark.sql.execution.command.partition.{CarbonAlterTableAddHivePartitionCommand, CarbonAlterTableDropHivePartitionCommand, CarbonShowCarbonPartitionsCommand} import org.apache.spark.sql.execution.command.schema._ import org.apache.spark.sql.execution.command.table.{CarbonDescribeFormattedCommand, CarbonDropTableCommand} import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand} @@ -247,16 +247,19 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { } else { ExecutedCommandExec(rename) :: Nil } - case addPartition@AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, _) => + case addPart@AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, ifNotExists) => val dbOption = tableName.database.map(_.toLowerCase) val tableIdentifier = TableIdentifier(tableName.table.toLowerCase(), dbOption) val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore .tableExists(tableIdentifier)(sparkSession) - if (isCarbonTable && partitionSpecsAndLocs.exists(_._2.isDefined)) { - throw new UnsupportedOperationException( - "add partition with location is not supported") + if (isCarbonTable) { + ExecutedCommandExec( + CarbonAlterTableAddHivePartitionCommand( + tableName, + partitionSpecsAndLocs, + ifNotExists)) :: Nil } else { - ExecutedCommandExec(addPartition) :: Nil + ExecutedCommandExec(addPart) :: Nil } case RefreshTable(tableIdentifier) => RefreshCarbonTableCommand(tableIdentifier.database, http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala index b8608f4..7bf8536 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala @@ -29,11 +29,14 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Stati import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CarbonException +import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension} import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} /** @@ -209,9 +212,10 @@ case class CarbonRelation( .getValidAndInvalidSegments.getValidSegments.isEmpty) { sizeInBytesLocalValue = 0L } else { - val tablePath = CarbonStorePath.getCarbonTablePath( + val carbonTablePath = CarbonStorePath.getCarbonTablePath( carbonTable.getTablePath, - carbonTable.getCarbonTableIdentifier).getPath + carbonTable.getCarbonTableIdentifier) + val tablePath = carbonTablePath.getPath val fileType = FileFactory.getFileType(tablePath) if (FileFactory.isFileExist(tablePath, fileType)) { // get the valid segments @@ -220,8 +224,14 @@ case class CarbonRelation( var size = 0L // for each segment calculate the size segments.foreach {validSeg => - size = size + FileFactory.getDirectorySize( - CarbonTablePath.getSegmentPath(tablePath, validSeg)) + if (validSeg.getSegmentFileName != null) { + val fileStore = new SegmentFileStore(tablePath, validSeg.getSegmentFileName) + size = size + CarbonUtil.getSizeOfSegment( + carbonTablePath, new Segment(validSeg.getSegmentNo, validSeg.getSegmentFileName)) + } else { + size = size + FileFactory.getDirectorySize( + CarbonTablePath.getSegmentPath(tablePath, validSeg.getSegmentNo)) + } } // update the new table status time tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala index c7767ce..e5eb53c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.optimizer +import java.util + import scala.collection.JavaConverters._ import org.apache.spark.sql._ @@ -30,14 +32,15 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hive.CarbonSessionCatalog import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.PartitionMapFileStore +import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.indexstore.PartitionSpec +import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression} import org.apache.carbondata.core.scan.expression.conditional._ import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression} import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} -import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.spark.CarbonAliasDecoderRelation import org.apache.carbondata.spark.util.CarbonScalaUtil @@ -432,15 +435,11 @@ object CarbonFilters { } def getCurrentPartitions(sparkSession: SparkSession, - carbonTable: CarbonTable): Seq[String] = { - if (carbonTable.isHivePartitionTable) { - CarbonFilters.getPartitions( - Seq.empty, - sparkSession, - TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName))) - } else { - Seq.empty - } + tableIdentifier: TableIdentifier): Option[Seq[PartitionSpec]] = { + CarbonFilters.getPartitions( + Seq.empty, + sparkSession, + tableIdentifier) } /** @@ -452,11 +451,15 @@ object CarbonFilters { */ def getPartitions(partitionFilters: Seq[Expression], sparkSession: SparkSession, - identifier: TableIdentifier): Seq[String] = { + identifier: TableIdentifier): Option[Seq[PartitionSpec]] = { + val table = CarbonEnv.getCarbonTable(identifier)(sparkSession) // first try to read partitions in case if the trigger comes from the aggregation table load. - val partitionsForAggTable = getPartitionsForAggTable(sparkSession, identifier) + val partitionsForAggTable = getPartitionsForAggTable(sparkSession, table) if (partitionsForAggTable.isDefined) { - return partitionsForAggTable.get + return partitionsForAggTable + } + if (!table.isHivePartitionTable) { + return None } val partitions = { try { @@ -483,35 +486,33 @@ object CarbonFilters { identifier) } } - partitions.toList.flatMap { partition => - partition.spec.seq.map{case (column, value) => column + "=" + value} - }.toSet.toSeq + Some(partitions.map { partition => + new PartitionSpec( + new util.ArrayList[String]( partition.spec.seq.map{case (column, value) => + column + "=" + value}.toList.asJava), partition.location) + }) } /** * In case of loading aggregate tables it needs to be get only from the main table load in - * progress segment. So we should read from the partition map file of that segment. + * progress segment. So we should read from the segment file of that segment */ def getPartitionsForAggTable(sparkSession: SparkSession, - identifier: TableIdentifier): Option[Seq[String]] = { + table: CarbonTable): Option[Seq[PartitionSpec]] = { // when validate segments is disabled then only read from partitionmap val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo if (carbonSessionInfo != null) { val validateSegments = carbonSessionInfo.getSessionParams .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + - CarbonEnv.getDatabaseName(identifier.database)(sparkSession) + "." + - identifier.table, "true").toBoolean + table.getDatabaseName + "." + + table.getTableName, "true").toBoolean if (!validateSegments) { val segmentNumbersFromProperty = CarbonProperties.getInstance .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + - CarbonEnv.getDatabaseName(identifier.database)(sparkSession) - + "." + identifier.table) - val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession) - val segmentPath = - CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNumbersFromProperty) - val partitionMapper = new PartitionMapFileStore() - partitionMapper.readAllPartitionsOfSegment(segmentPath) - Some(partitionMapper.getPartitionMap.asScala.map(_._2).flatMap(_.asScala).toSet.toSeq) + table.getDatabaseName + "." + table.getTableName) + val segment = Segment.toSegment(segmentNumbersFromProperty) + val segmentFile = new SegmentFileStore(table.getTablePath, segment.getSegmentFileName) + Some(segmentFile.getPartitionSpecs.asScala) } else { None } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala index 0b62e10..d45020b 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Experime import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.util.CarbonScalaUtil /** @@ -146,14 +146,8 @@ class CarbonSessionCatalog( ignoreIfExists: Boolean): Unit = { try { val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) - // Get the properties from thread local - val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo - if (carbonSessionInfo != null) { - val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table) - super.createPartitions(tableName, updatedParts, ignoreIfExists) - } else { - super.createPartitions(tableName, parts, ignoreIfExists) - } + val updatedParts = CarbonScalaUtil.updatePartitions(parts, table) + super.createPartitions(tableName, updatedParts, ignoreIfExists) } catch { case e: Exception => super.createPartitions(tableName, parts, ignoreIfExists) http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala index baadd04..b9425d6 100644 --- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -46,7 +46,7 @@ import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CarbonScalaUtil @@ -144,14 +144,8 @@ class CarbonSessionCatalog( ignoreIfExists: Boolean): Unit = { try { val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) - // Get the properties from thread local - val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo - if (carbonSessionInfo != null) { - val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table) - super.createPartitions(tableName, updatedParts, ignoreIfExists) - } else { - super.createPartitions(tableName, parts, ignoreIfExists) - } + val updatedParts = CarbonScalaUtil.updatePartitions(parts, table) + super.createPartitions(tableName, updatedParts, ignoreIfExists) } catch { case e: Exception => super.createPartitions(tableName, parts, ignoreIfExists) http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index 4b0113c..d739f8c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -27,6 +27,7 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datamap.DataMapMeta; import org.apache.carbondata.core.datamap.DataMapStoreManager; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datamap.dev.DataMapFactory; import org.apache.carbondata.core.datamap.dev.DataMapWriter; @@ -71,7 +72,7 @@ public class DataMapWriterListener { } List<String> columns = factory.getMeta().getIndexedColumns(); List<DataMapWriter> writers = registry.get(columns); - DataMapWriter writer = factory.createWriter(segmentId); + DataMapWriter writer = factory.createWriter(new Segment(segmentId, null)); if (writers != null) { writers.add(writer); } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java index 7b1ab9d..b7270b9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java @@ -109,6 +109,11 @@ public class CarbonDataLoadConfiguration { */ private short writingCoresCount; + /** + * Flder path to where data should be written for this load. + */ + private String dataWritePath; + public CarbonDataLoadConfiguration() { } @@ -363,4 +368,12 @@ public class CarbonDataLoadConfiguration { public void setWritingCoresCount(short writingCoresCount) { this.writingCoresCount = writingCoresCount; } + + public String getDataWritePath() { + return dataWritePath; + } + + public void setDataWritePath(String dataWritePath) { + this.dataWritePath = dataWritePath; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index ba24d41..f5b29e7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -57,7 +57,8 @@ public final class DataLoadProcessBuilder { CarbonIterator[] inputIterators) throws Exception { CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation); SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration); - if (!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) { + if ((!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) + && !loadModel.isPartitionLoad()) { return buildInternalForNoSort(inputIterators, configuration); } else if (configuration.getBucketingInfo() != null) { return buildInternalForBucketing(inputIterators, configuration); @@ -251,6 +252,7 @@ public final class DataLoadProcessBuilder { configuration.setPreFetch(loadModel.isPreFetch()); configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns()); configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns()); + configuration.setDataWritePath(loadModel.getDataWritePath()); // For partition loading always use single core as it already runs in multiple // threads per partition if (carbonTable.isHivePartitionTable()) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java index 85eb19b..6664a2c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java @@ -89,7 +89,7 @@ public class MeasureFieldConverterImpl implements FieldConverter { } else { try { if (dataField.isUseActualData()) { - output = DataTypeUtil.getConvertedMeasureValueBasedOnDataType(value, dataType, measure); + output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure, true); } else { output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java index 4c536ea..a17178a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java @@ -193,6 +193,11 @@ public class CarbonLoadModel implements Serializable { */ private boolean isPartitionLoad; + /** + * Flder path to where data should be written for this load. + */ + private String dataWritePath; + public boolean isAggLoadRequest() { return isAggLoadRequest; } @@ -870,4 +875,12 @@ public class CarbonLoadModel implements Serializable { public void setPartitionLoad(boolean partitionLoad) { isPartitionLoad = partitionLoad; } + + public String getDataWritePath() { + return dataWritePath; + } + + public void setDataWritePath(String dataWritePath) { + this.dataWritePath = dataWritePath; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index be27866..3f1430d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -26,6 +26,7 @@ import java.util.*; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -42,6 +43,7 @@ import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl; @@ -280,7 +282,7 @@ public final class CarbonDataMergerUtil { */ public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge, String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel carbonLoadModel, - CompactionType compactionType) throws IOException { + CompactionType compactionType, String segmentFile) throws IOException { boolean tableStatusUpdationStatus = false; AbsoluteTableIdentifier absoluteTableIdentifier = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); @@ -325,6 +327,7 @@ public final class CarbonDataMergerUtil { loadMetadataDetails.setLoadEndTime(loadEnddate); CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(); loadMetadataDetails.setLoadName(mergedLoadNumber); + loadMetadataDetails.setSegmentFile(segmentFile); CarbonLoaderUtil .addDataIndexSizeIntoMetaEntry(loadMetadataDetails, mergedLoadNumber, carbonTable); loadMetadataDetails.setLoadStartTime(carbonLoadModel.getFactTimeStamp()); @@ -385,7 +388,7 @@ public final class CarbonDataMergerUtil { */ public static List<LoadMetadataDetails> identifySegmentsToBeMerged( CarbonLoadModel carbonLoadModel, long compactionSize, - List<LoadMetadataDetails> segments, CompactionType compactionType) { + List<LoadMetadataDetails> segments, CompactionType compactionType) throws IOException { String tablePath = carbonLoadModel.getTablePath(); Map<String, String> tableLevelProperties = carbonLoadModel.getCarbonDataLoadSchema() .getCarbonTable().getTableInfo().getFactTable().getTableProperties(); @@ -590,13 +593,13 @@ public final class CarbonDataMergerUtil { */ private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize( long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve, - CarbonLoadModel carbonLoadModel, String tablePath) { + CarbonLoadModel carbonLoadModel, String tablePath) throws IOException { List<LoadMetadataDetails> segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - CarbonTableIdentifier tableIdentifier = - carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(); + CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(); + CarbonTableIdentifier tableIdentifier = carbonTable.getCarbonTableIdentifier(); // total length @@ -612,8 +615,15 @@ public final class CarbonDataMergerUtil { String segId = segment.getLoadName(); // variable to store one segment size across partition. - long sizeOfOneSegmentAcrossPartition = - getSizeOfSegment(tablePath, tableIdentifier, segId); + long sizeOfOneSegmentAcrossPartition; + if (segment.getSegmentFile() != null) { + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier()); + sizeOfOneSegmentAcrossPartition = CarbonUtil + .getSizeOfSegment(carbonTablePath, new Segment(segId, segment.getSegmentFile())); + } else { + sizeOfOneSegmentAcrossPartition = getSizeOfSegment(tablePath, tableIdentifier, segId); + } // if size of a segment is greater than the Major compaction size. then ignore it. if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) { @@ -863,18 +873,18 @@ public final class CarbonDataMergerUtil { * @param loadMetadataDetails * @return */ - public static String getValidSegments(List<LoadMetadataDetails> loadMetadataDetails) { - StringBuilder builder = new StringBuilder(); + public static List<Segment> getValidSegments(List<LoadMetadataDetails> loadMetadataDetails) { + List<Segment> segments = new ArrayList<>(); for (LoadMetadataDetails segment : loadMetadataDetails) { //check if this load is an already merged load. if (null != segment.getMergedLoadName()) { - builder.append(segment.getMergedLoadName()).append(","); + + segments.add(Segment.toSegment(segment.getMergedLoadName())); } else { - builder.append(segment.getLoadName()).append(","); + segments.add(Segment.toSegment(segment.getLoadName())); } } - builder.deleteCharAt(builder.length() - 1); - return builder.toString(); + return segments; } /** @@ -883,7 +893,7 @@ public final class CarbonDataMergerUtil { * @param absoluteTableIdentifier * @return */ - public static List<String> getValidSegmentList(AbsoluteTableIdentifier absoluteTableIdentifier) + public static List<Segment> getValidSegmentList(AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException { SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = null; @@ -933,7 +943,8 @@ public final class CarbonDataMergerUtil { int numberUpdateDeltaFilesThreshold = CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction(); for (LoadMetadataDetails seg : segments) { - if ((isSegmentValid(seg)) && checkUpdateDeltaFilesInSeg(seg.getLoadName(), + if ((isSegmentValid(seg)) && checkUpdateDeltaFilesInSeg( + new Segment(seg.getLoadName(), seg.getSegmentFile()), absoluteTableIdentifier, carbonLoadModel.getSegmentUpdateStatusManager(), numberUpdateDeltaFilesThreshold)) { validSegments.add(seg); @@ -950,12 +961,12 @@ public final class CarbonDataMergerUtil { /** * method gets the segments list which get qualified for IUD compaction. - * @param Segments + * @param segments * @param absoluteTableIdentifier * @param compactionTypeIUD * @return */ - public static List<String> getSegListIUDCompactionQualified(List<String> Segments, + public static List<String> getSegListIUDCompactionQualified(List<Segment> segments, AbsoluteTableIdentifier absoluteTableIdentifier, SegmentUpdateStatusManager segmentUpdateStatusManager, CompactionType compactionTypeIUD) { @@ -964,8 +975,8 @@ public final class CarbonDataMergerUtil { if (CompactionType.IUD_DELETE_DELTA == compactionTypeIUD) { int numberDeleteDeltaFilesThreshold = CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction(); - List<String> deleteSegments = new ArrayList<>(); - for (String seg : Segments) { + List<Segment> deleteSegments = new ArrayList<>(); + for (Segment seg : segments) { if (checkDeleteDeltaFilesInSeg(seg, segmentUpdateStatusManager, numberDeleteDeltaFilesThreshold)) { deleteSegments.add(seg); @@ -975,7 +986,7 @@ public final class CarbonDataMergerUtil { // This Code Block Append the Segname along with the Blocks selected for Merge instead of // only taking the segment name. This will help to parallelize better for each block // in case of Delete Horizontal Compaction. - for (String segName : deleteSegments) { + for (Segment segName : deleteSegments) { List<String> tempSegments = getDeleteDeltaFilesInSeg(segName, segmentUpdateStatusManager, numberDeleteDeltaFilesThreshold); validSegments.addAll(tempSegments); @@ -984,10 +995,10 @@ public final class CarbonDataMergerUtil { } else if (CompactionType.IUD_UPDDEL_DELTA == compactionTypeIUD) { int numberUpdateDeltaFilesThreshold = CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction(); - for (String seg : Segments) { + for (Segment seg : segments) { if (checkUpdateDeltaFilesInSeg(seg, absoluteTableIdentifier, segmentUpdateStatusManager, numberUpdateDeltaFilesThreshold)) { - validSegments.add(seg); + validSegments.add(seg.getSegmentNo()); } } } @@ -1027,7 +1038,7 @@ public final class CarbonDataMergerUtil { * @param numberDeltaFilesThreshold * @return */ - public static Boolean checkUpdateDeltaFilesInSeg(String seg, + public static Boolean checkUpdateDeltaFilesInSeg(Segment seg, AbsoluteTableIdentifier absoluteTableIdentifier, SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) { @@ -1036,14 +1047,14 @@ public final class CarbonDataMergerUtil { CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", seg); + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", seg.getSegmentNo()); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); CarbonFile[] allSegmentFiles = segDir.listFiles(); updateDeltaFiles = segmentUpdateStatusManager - .getUpdateDeltaFilesForSegment(seg, true, CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, - false, allSegmentFiles); + .getUpdateDeltaFilesForSegment(seg.getSegmentNo(), true, + CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, false, allSegmentFiles); if (updateDeltaFiles == null) { return false; @@ -1079,11 +1090,12 @@ public final class CarbonDataMergerUtil { * @param numberDeltaFilesThreshold * @return */ - private static boolean checkDeleteDeltaFilesInSeg(String seg, + private static boolean checkDeleteDeltaFilesInSeg(Segment seg, SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) { Set<String> uniqueBlocks = new HashSet<String>(); - List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg); + List<String> blockNameList = + segmentUpdateStatusManager.getBlockNameFromSegment(seg.getSegmentNo()); for (final String blockName : blockNameList) { @@ -1121,11 +1133,12 @@ public final class CarbonDataMergerUtil { * @return */ - private static List<String> getDeleteDeltaFilesInSeg(String seg, + private static List<String> getDeleteDeltaFilesInSeg(Segment seg, SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) { List<String> blockLists = new ArrayList<>(); - List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg); + List<String> blockNameList = + segmentUpdateStatusManager.getBlockNameFromSegment(seg.getSegmentNo()); for (final String blockName : blockNameList) { @@ -1133,7 +1146,7 @@ public final class CarbonDataMergerUtil { segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName); if (deleteDeltaFiles.length > numberDeltaFilesThreshold) { - blockLists.add(seg + "/" + blockName); + blockLists.add(seg.getSegmentNo() + "/" + blockName); } } return blockLists; @@ -1177,7 +1190,7 @@ public final class CarbonDataMergerUtil { segmentUpdateStatusManager.setUpdateStatusDetails(segmentUpdateDetails); CarbonFile[] deleteDeltaFiles = - segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName); + segmentUpdateStatusManager.getDeleteDeltaFilesList(new Segment(seg, null), blockName); String destFileName = blockName + "-" + timestamp.toString() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT; http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index 2480a39..2fbdf4f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -26,7 +26,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.metadata.PartitionMapFileStore; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; @@ -35,7 +36,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger; @@ -129,19 +129,20 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { */ private SortIntermediateFileMerger intermediateFileMerger; - private List<String> partitionNames; + private PartitionSpec partitionSpec; + private SortParameters sortParameters; public CompactionResultSortProcessor(CarbonLoadModel carbonLoadModel, CarbonTable carbonTable, SegmentProperties segmentProperties, CompactionType compactionType, String tableName, - List<String> partitionNames) { + PartitionSpec partitionSpec) { this.carbonLoadModel = carbonLoadModel; this.carbonTable = carbonTable; this.segmentProperties = segmentProperties; this.segmentId = carbonLoadModel.getSegmentId(); this.compactionType = compactionType; this.tableName = tableName; - this.partitionNames = partitionNames; + this.partitionSpec = partitionSpec; } /** @@ -168,14 +169,12 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { } catch (Exception e) { LOGGER.error(e, "Compaction failed: " + e.getMessage()); } finally { - if (partitionNames != null) { + if (partitionSpec != null) { try { - new PartitionMapFileStore().writePartitionMapFile( - CarbonTablePath.getSegmentPath( - carbonLoadModel.getTablePath(), - carbonLoadModel.getSegmentId()), - carbonLoadModel.getTaskNo(), - partitionNames); + SegmentFileStore + .writeSegmentFile(carbonLoadModel.getTablePath(), carbonLoadModel.getTaskNo(), + partitionSpec.getLocation().toString(), carbonLoadModel.getFactTimeStamp() + "", + partitionSpec.getPartitions()); } catch (IOException e) { LOGGER.error(e, "Compaction failed: " + e.getMessage()); isCompactionSuccess = false; @@ -401,9 +400,19 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { * initialise carbon data writer instance */ private void initDataHandler() throws Exception { + String carbonStoreLocation; + if (partitionSpec != null) { + carbonStoreLocation = + partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + + carbonLoadModel.getFactTimeStamp() + ".tmp"; + } else { + carbonStoreLocation = CarbonDataProcessorUtil + .createCarbonStoreLocation(carbonTable.getTablePath(), carbonLoadModel.getDatabaseName(), + tableName, carbonLoadModel.getPartitionId(), carbonLoadModel.getSegmentId()); + } CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName, - tempStoreLocation); + tempStoreLocation, carbonStoreLocation); setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonTable, carbonFactDataHandlerModel); dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel, http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java index 3d0700b..b41829f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java @@ -24,18 +24,19 @@ import java.util.PriorityQueue; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; +import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.metadata.CarbonMetadata; -import org.apache.carbondata.core.metadata.PartitionMapFileStore; +import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; import org.apache.carbondata.core.util.ByteUtil; -import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.exception.SliceMergerException; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar; @@ -51,7 +52,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { private CarbonFactHandler dataHandler; private SegmentProperties segprop; private CarbonLoadModel loadModel; - private List<String> partitionNames; + private PartitionSpec partitionSpec; /** * record holder heap */ @@ -62,16 +63,26 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { public RowResultMergerProcessor(String databaseName, String tableName, SegmentProperties segProp, String[] tempStoreLocation, - CarbonLoadModel loadModel, CompactionType compactionType, List<String> partitionNames) { + CarbonLoadModel loadModel, CompactionType compactionType, PartitionSpec partitionSpec) { this.segprop = segProp; - this.partitionNames = partitionNames; + this.partitionSpec = partitionSpec; this.loadModel = loadModel; CarbonDataProcessorUtil.createLocations(tempStoreLocation); CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); + String carbonStoreLocation; + if (partitionSpec != null) { + carbonStoreLocation = + partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + loadModel + .getFactTimeStamp() + ".tmp"; + } else { + carbonStoreLocation = CarbonDataProcessorUtil + .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(), + tableName, loadModel.getPartitionId(), loadModel.getSegmentId()); + } CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName, - tempStoreLocation); + tempStoreLocation, carbonStoreLocation); setDataFileAttributesInModel(loadModel, compactionType, carbonTable, carbonFactDataHandlerModel); carbonFactDataHandlerModel.setCompactionFlow(true); @@ -157,14 +168,13 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { if (isDataPresent) { this.dataHandler.closeHandler(); } - if (partitionNames != null) { - new PartitionMapFileStore().writePartitionMapFile( - CarbonTablePath.getSegmentPath(loadModel.getTablePath(), loadModel.getSegmentId()), - loadModel.getTaskNo(), - partitionNames); + if (partitionSpec != null) { + SegmentFileStore.writeSegmentFile(loadModel.getTablePath(), loadModel.getTaskNo(), + partitionSpec.getLocation().toString(), loadModel.getFactTimeStamp() + "", + partitionSpec.getPartitions()); } } catch (CarbonDataWriterException | IOException e) { - LOGGER.error(e,"Exception in compaction merger"); + LOGGER.error(e, "Exception in compaction merger"); mergeStatus = false; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java index 68a212e..ff6ca93 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java @@ -47,9 +47,12 @@ public class RowResultProcessor { CarbonDataProcessorUtil.createLocations(tempStoreLocation); this.segmentProperties = segProp; String tableName = carbonTable.getTableName(); + String carbonStoreLocation = CarbonDataProcessorUtil + .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(), + tableName, loadModel.getPartitionId(), loadModel.getSegmentId()); CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable, - segProp, tableName, tempStoreLocation); + segProp, tableName, tempStoreLocation, carbonStoreLocation); CarbonDataFileAttributes carbonDataFileAttributes = new CarbonDataFileAttributes(Long.parseLong(loadModel.getTaskNo()), loadModel.getFactTimeStamp()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index d15152c..d77fcab 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -275,7 +275,7 @@ public class CarbonFactDataHandlerModel { */ public static CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoadModel loadModel, CarbonTable carbonTable, SegmentProperties segmentProperties, String tableName, - String[] tempStoreLocation) { + String[] tempStoreLocation, String carbonDataDirectoryPath) { CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel(); carbonFactDataHandlerModel.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime()); carbonFactDataHandlerModel.setDatabaseName(loadModel.getDatabaseName()); @@ -307,9 +307,7 @@ public class CarbonFactDataHandlerModel { measureDataTypes[i++] = msr.getDataType(); } carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes); - String carbonDataDirectoryPath = CarbonDataProcessorUtil - .checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(), - tableName, loadModel.getPartitionId(), loadModel.getSegmentId()); + CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath); carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath); List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName); boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()]; @@ -334,6 +332,10 @@ public class CarbonFactDataHandlerModel { * @return data directory path */ private static String getCarbonDataFolderLocation(CarbonDataLoadConfiguration configuration) { + if (configuration.getDataWritePath() != null) { + CarbonUtil.checkAndCreateFolder(configuration.getDataWritePath()); + return configuration.getDataWritePath(); + } AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier(); CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); String carbonDataDirectoryPath = carbonTablePath http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index 376a546..1e648e1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -302,7 +302,7 @@ public final class CarbonDataProcessorUtil { } public static boolean isHeaderValid(String tableName, String[] csvHeader, - CarbonDataLoadSchema schema) { + CarbonDataLoadSchema schema, List<String> ignoreColumns) { Iterator<String> columnIterator = CarbonDataProcessorUtil.getSchemaColumnNames(schema, tableName).iterator(); Set<String> csvColumns = new HashSet<String>(csvHeader.length); @@ -311,7 +311,8 @@ public final class CarbonDataProcessorUtil { // file header should contain all columns of carbon table. // So csvColumns should contain all elements of columnIterator. while (columnIterator.hasNext()) { - if (!csvColumns.contains(columnIterator.next().toLowerCase())) { + String column = columnIterator.next().toLowerCase(); + if (!csvColumns.contains(column) && !ignoreColumns.contains(column)) { return false; } } @@ -377,7 +378,7 @@ public final class CarbonDataProcessorUtil { * * @return data directory path */ - public static String checkAndCreateCarbonStoreLocation(String factStoreLocation, + public static String createCarbonStoreLocation(String factStoreLocation, String databaseName, String tableName, String partitionId, String segmentId) { CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier(); @@ -385,7 +386,6 @@ public final class CarbonDataProcessorUtil { CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier); String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId); - CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath); return carbonDataDirectoryPath; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 00f13a5..32c72da 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -34,6 +34,7 @@ import org.apache.carbondata.core.cache.CacheType; import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.block.Distributable; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; @@ -166,6 +167,23 @@ public final class CarbonLoaderUtil { public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry, CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid) throws IOException { + return recordNewLoadMetadata(newMetaEntry, loadModel, loadStartEntry, insertOverwrite, uuid, + new ArrayList<Segment>(), new ArrayList<Segment>()); + } + + /** + * This API will write the load level metadata for the loadmanagement module inorder to + * manage the load and query execution management smoothly. + * + * @param newMetaEntry + * @param loadModel + * @param uuid + * @return boolean which determines whether status update is done or not. + * @throws IOException + */ + public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry, + CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid, + List<Segment> segmentsToBeDeleted, List<Segment> segmentFilesTobeUpdated) throws IOException { boolean status = false; AbsoluteTableIdentifier absoluteTableIdentifier = loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); @@ -237,9 +255,11 @@ public final class CarbonLoaderUtil { // existing entry needs to be overwritten as the entry will exist with some // intermediate status int indexToOverwriteNewMetaEntry = 0; + boolean found = false; for (LoadMetadataDetails entry : listOfLoadFolderDetails) { if (entry.getLoadName().equals(newMetaEntry.getLoadName()) && entry.getLoadStartTime() == newMetaEntry.getLoadStartTime()) { + found = true; break; } indexToOverwriteNewMetaEntry++; @@ -254,6 +274,10 @@ public final class CarbonLoaderUtil { } } } + if (!found) { + LOGGER.error("Entry not found to update " + newMetaEntry + " From list :: " + + listOfLoadFolderDetails); + } listOfLoadFolderDetails.set(indexToOverwriteNewMetaEntry, newMetaEntry); } // when no records are inserted then newSegmentEntry will be SegmentStatus.MARKED_FOR_DELETE @@ -262,6 +286,17 @@ public final class CarbonLoaderUtil { addToStaleFolders(carbonTablePath, staleFolders, newMetaEntry); } + for (LoadMetadataDetails detail: listOfLoadFolderDetails) { + // if the segments is in the list of marked for delete then update the status. + if (segmentsToBeDeleted.contains(new Segment(detail.getLoadName(), null))) { + detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE); + } else if (segmentFilesTobeUpdated.contains(Segment.toSegment(detail.getLoadName()))) { + detail.setSegmentFile( + detail.getLoadName() + "_" + newMetaEntry.getUpdateStatusFileName() + + CarbonTablePath.SEGMENT_EXT); + } + } + SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()])); // Delete all old stale segment folders @@ -907,8 +942,8 @@ public final class CarbonLoaderUtil { String segmentId, CarbonTable carbonTable) throws IOException { CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier())); - Map<String, Long> dataIndexSize = - CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, segmentId); + Map<String, Long> dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, + new Segment(segmentId, loadMetadataDetails.getSegmentFile())); Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE); loadMetadataDetails.setDataSize(String.valueOf(dataSize)); Long indexSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE); http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java index 02ab1d8..52b9f52 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java @@ -18,16 +18,19 @@ package org.apache.carbondata.processing.util; import java.io.IOException; +import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.locks.CarbonLockFactory; import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.locks.LockUsage; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; @@ -60,54 +63,61 @@ public final class DeleteLoadFolders { } public static void physicalFactAndMeasureMetadataDeletion( - AbsoluteTableIdentifier absoluteTableIdentifier, String metadataPath, boolean isForceDelete) { + AbsoluteTableIdentifier absoluteTableIdentifier, String metadataPath, boolean isForceDelete, + List<PartitionSpec> specs) { LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath); for (LoadMetadataDetails oneLoad : currentDetails) { if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) { - String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad); - boolean status = false; try { - if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) { - CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path)); - CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() { - - @Override public boolean accept(CarbonFile file) { - return (CarbonTablePath.isCarbonDataFile(file.getName()) - || CarbonTablePath.isCarbonIndexFile(file.getName()) - || CarbonTablePath.isPartitionMapFile(file.getName())); - } - }); + if (oneLoad.getSegmentFile() != null) { + SegmentFileStore + .deleteSegment(absoluteTableIdentifier.getTablePath(), oneLoad.getSegmentFile(), + specs); + } else { + String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad); + boolean status = false; + if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) { + CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path)); + CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() { + + @Override public boolean accept(CarbonFile file) { + return (CarbonTablePath.isCarbonDataFile(file.getName()) || + CarbonTablePath.isCarbonIndexFile(file.getName())); + } + }); - //if there are no fact and msr metadata files present then no need to keep - //entry in metadata. - if (filesToBeDeleted.length == 0) { - status = true; - } else { + //if there are no fact and msr metadata files present then no need to keep + //entry in metadata. + if (filesToBeDeleted.length == 0) { + status = true; + } else { - for (CarbonFile eachFile : filesToBeDeleted) { - if (!eachFile.delete()) { - LOGGER.warn("Unable to delete the file as per delete command " + eachFile - .getAbsolutePath()); - status = false; - } else { - status = true; + for (CarbonFile eachFile : filesToBeDeleted) { + if (!eachFile.delete()) { + LOGGER.warn("Unable to delete the file as per delete command " + eachFile + .getAbsolutePath()); + status = false; + } else { + status = true; + } } } - } - // need to delete the complete folder. - if (status) { - if (!file.delete()) { - LOGGER.warn( - "Unable to delete the folder as per delete command " + file.getAbsolutePath()); + // need to delete the complete folder. + if (status) { + if (!file.delete()) { + LOGGER.warn("Unable to delete the folder as per delete command " + file + .getAbsolutePath()); + } } + + } else { + LOGGER.warn("Files are not found in segment " + path + + " it seems, files are already being deleted"); } - } else { - LOGGER.warn("Files are not found in segment " + path - + " it seems, files are already being deleted"); } } catch (IOException e) { - LOGGER.warn("Unable to delete the file as per delete command " + path); + LOGGER.warn("Unable to delete the file as per delete command " + oneLoad.getLoadName()); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java index 63320ef..cd1e28a 100644 --- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java +++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java @@ -83,7 +83,7 @@ public class BlockIndexStoreTest extends TestCase { // assertTrue(false); // } // List<String> segmentIds = new ArrayList<>(); -// segmentIds.add(info.getSegmentId()); +// segmentIds.add(info.getSegment()); // cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); // } // @@ -151,7 +151,7 @@ public class BlockIndexStoreTest extends TestCase { // } // List<String> segmentIds = new ArrayList<>(); // for (TableBlockInfo tableBlockInfo : tableBlockInfos) { -// segmentIds.add(tableBlockInfo.getSegmentId()); +// segmentIds.add(tableBlockInfo.getSegment()); // } // cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); // } @@ -223,7 +223,7 @@ public class BlockIndexStoreTest extends TestCase { // } // List<String> segmentIds = new ArrayList<>(); // for (TableBlockInfo tableBlockInfo : tableBlockInfos) { -// segmentIds.add(tableBlockInfo.getSegmentId()); +// segmentIds.add(tableBlockInfo.getSegment()); // } // cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); // }