Repository: carbondata Updated Branches: refs/heads/metadata 4b69c9d8e -> 2a9debfcc
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala new file mode 100644 index 0000000..f245df6 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import java.util.LinkedHashSet + +import scala.Array.canBuildFrom +import scala.collection.JavaConverters._ +import scala.util.parsing.combinator.RegexParsers + +import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL +import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension} +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.processing.merger.TableMeta + +/** + * Represents logical plan for one carbon table + */ +case class CarbonRelation( + databaseName: String, + tableName: String, + var metaData: CarbonMetaData, + tableMeta: TableMeta) + extends LeafNode with MultiInstanceRelation { + + def recursiveMethod(dimName: String, childDim: CarbonDimension): String = { + childDim.getDataType.toString.toLowerCase match { + case "array" => s"${ + childDim.getColName.substring(dimName.length + 1) + }:array<${ getArrayChildren(childDim.getColName) }>" + case "struct" => s"${ + childDim.getColName.substring(dimName.length + 1) + }:struct<${ getStructChildren(childDim.getColName) }>" + case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }" + } + } + + def getArrayChildren(dimName: String): String = { + metaData.carbonTable.getChildren(dimName).asScala.map(childDim => { + childDim.getDataType.toString.toLowerCase match { + case "array" => s"array<${ getArrayChildren(childDim.getColName) }>" + case "struct" => s"struct<${ getStructChildren(childDim.getColName) }>" + case dType => addDecimalScaleAndPrecision(childDim, dType) + } + }).mkString(",") + } + + def getStructChildren(dimName: String): String = { + metaData.carbonTable.getChildren(dimName).asScala.map(childDim => { + childDim.getDataType.toString.toLowerCase match { + case "array" => s"${ + childDim.getColName.substring(dimName.length + 1) + }:array<${ getArrayChildren(childDim.getColName) }>" + case "struct" => s"${ + childDim.getColName.substring(dimName.length + 1) + }:struct<${ metaData.carbonTable.getChildren(childDim.getColName) + .asScala.map(f => s"${ recursiveMethod(childDim.getColName, f) }").mkString(",") + }>" + case dType => s"${ childDim.getColName + .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }" + } + }).mkString(",") + } + + override def newInstance(): LogicalPlan = { + CarbonRelation(databaseName, tableName, metaData, tableMeta) + .asInstanceOf[this.type] + } + + val dimensionsAttr = { + val sett = new LinkedHashSet( + tableMeta.carbonTable.getDimensionByTableName(tableMeta.carbonTableIdentifier.getTableName) + .asScala.asJava) + sett.asScala.toSeq.map(dim => { + val dimval = metaData.carbonTable + .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName) + val output: DataType = dimval.getDataType + .toString.toLowerCase match { + case "array" => + CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>") + case "struct" => + CarbonMetastoreTypes.toDataType(s"struct<${ getStructChildren(dim.getColName) }>") + case dType => + val dataType = addDecimalScaleAndPrecision(dimval, dType) + CarbonMetastoreTypes.toDataType(dataType) + } + + AttributeReference( + dim.getColName, + output, + nullable = true)() + }) + } + + val measureAttr = { + val factTable = tableMeta.carbonTable.getFactTableName + new LinkedHashSet( + tableMeta.carbonTable. + getMeasureByTableName(tableMeta.carbonTable.getFactTableName). + asScala.asJava).asScala.toSeq + .map(x => AttributeReference(x.getColName, CarbonMetastoreTypes.toDataType( + metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.toString + .toLowerCase match { + case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")" + case others => others + }), + nullable = true)()) + } + + override val output = { + val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName) + .asScala + // convert each column to Attribute + columns.filter(!_.isInvisible).map { column => + if (column.isDimension()) { + val output: DataType = column.getDataType.toString.toLowerCase match { + case "array" => + CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>") + case "struct" => + CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>") + case dType => + val dataType = addDecimalScaleAndPrecision(column, dType) + CarbonMetastoreTypes.toDataType(dataType) + } + AttributeReference(column.getColName, output, nullable = true )( + qualifier = Option(tableName + "." + column.getColName)) + } else { + val output = CarbonMetastoreTypes.toDataType { + column.getDataType.toString + .toLowerCase match { + case "decimal" => "decimal(" + column.getColumnSchema.getPrecision + "," + column + .getColumnSchema.getScale + ")" + case others => others + } + } + AttributeReference(column.getColName, output, nullable = true)( + qualifier = Option(tableName + "." + column.getColName)) + } + } + } + + def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = { + var dType = dataType + if (dimval.getDataType == DECIMAL) { + dType += + "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")" + } + dType + } + + // TODO: Use data from the footers. + override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes) + + override def equals(other: Any): Boolean = { + other match { + case p: CarbonRelation => + p.databaseName == databaseName && p.output == output && p.tableName == tableName + case _ => false + } + } + + def addDecimalScaleAndPrecision(dimval: CarbonDimension, dataType: String): String = { + var dType = dataType + if (dimval.getDataType == DECIMAL) { + dType += + "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")" + } + dType + } + + private var tableStatusLastUpdateTime = 0L + + private var sizeInBytesLocalValue = 0L + + def sizeInBytes: Long = { + val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime( + tableMeta.carbonTable.getAbsoluteTableIdentifier) + + if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) { + val tablePath = CarbonStorePath.getCarbonTablePath( + tableMeta.storePath, + tableMeta.carbonTableIdentifier).getPath + val fileType = FileFactory.getFileType(tablePath) + if(FileFactory.isFileExist(tablePath, fileType)) { + tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime + sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath) + } + } + sizeInBytesLocalValue + } + +} + +object CarbonMetastoreTypes extends RegexParsers { + protected lazy val primitiveType: Parser[DataType] = + "string" ^^^ StringType | + "float" ^^^ FloatType | + "int" ^^^ IntegerType | + "tinyint" ^^^ ShortType | + "short" ^^^ ShortType | + "double" ^^^ DoubleType | + "long" ^^^ LongType | + "binary" ^^^ BinaryType | + "boolean" ^^^ BooleanType | + fixedDecimalType | + "decimal" ^^^ "decimal" ^^^ DecimalType(10, 0) | + "varchar\\((\\d+)\\)".r ^^^ StringType | + "date" ^^^ DateType | + "timestamp" ^^^ TimestampType + + protected lazy val fixedDecimalType: Parser[DataType] = + "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ { + case precision ~ scale => + DecimalType(precision.toInt, scale.toInt) + } + + protected lazy val arrayType: Parser[DataType] = + "array" ~> "<" ~> dataType <~ ">" ^^ { + case tpe => ArrayType(tpe) + } + + protected lazy val mapType: Parser[DataType] = + "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ { + case t1 ~ _ ~ t2 => MapType(t1, t2) + } + + protected lazy val structField: Parser[StructField] = + "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ { + case name ~ _ ~ tpe => StructField(name, tpe, nullable = true) + } + + protected lazy val structType: Parser[DataType] = + "struct" ~> "<" ~> repsep(structField, ",") <~ ">" ^^ { + case fields => StructType(fields) + } + + protected lazy val dataType: Parser[DataType] = + arrayType | + mapType | + structType | + primitiveType + + def toDataType(metastoreType: String): DataType = { + parseAll(dataType, metastoreType) match { + case Success(result, _) => result + case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType") + } + } + + def toMetastoreType(dt: DataType): String = { + dt match { + case ArrayType(elementType, _) => s"array<${ toMetastoreType(elementType) }>" + case StructType(fields) => + s"struct<${ + fields.map(f => s"${ f.name }:${ toMetastoreType(f.dataType) }") + .mkString(",") + }>" + case StringType => "string" + case FloatType => "float" + case IntegerType => "int" + case ShortType => "tinyint" + case DoubleType => "double" + case LongType => "bigint" + case BinaryType => "binary" + case BooleanType => "boolean" + case DecimalType() => "decimal" + case TimestampType => "timestamp" + case DateType => "date" + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 87717fb..01bdc4f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -65,7 +65,7 @@ object AlterTableUtil { val acquiredLocks = ListBuffer[ICarbonLock]() try { locksToBeAcquired.foreach { lock => - acquiredLocks += CarbonLockUtil.getLockObject(table, lock) + acquiredLocks += CarbonLockUtil.getLockObject(table.getCarbonTableIdentifier, lock) } acquiredLocks.toList } catch { @@ -133,6 +133,7 @@ object AlterTableUtil { val tableName = carbonTable.getFactTableName CarbonEnv.getInstance(sparkSession).carbonMetastore .updateTableSchema(carbonTable.getCarbonTableIdentifier, + carbonTable.getCarbonTableIdentifier, thriftTable, schemaEvolutionEntry, carbonTable.getStorePath)(sparkSession) @@ -185,14 +186,18 @@ object AlterTableUtil { (sparkSession: SparkSession): Unit = { val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) val newCarbonTableIdentifier = new CarbonTableIdentifier(database, newTableName, tableId) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, - newCarbonTableIdentifier) - val tableMetadataFile = carbonTablePath.getSchemaFilePath + val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, newCarbonTableIdentifier) + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val tableMetadataFile = carbonTablePath.getPath val fileType = FileFactory.getFileType(tableMetadataFile) if (FileFactory.isFileExist(tableMetadataFile, fileType)) { - val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession) - .carbonMetastore - .readSchemaFile(tableMetadataFile) + val tableInfo = if (metastore.isReadFromHiveMetaStore) { + // In case of hive metastore we first update the carbonschema inside old table only. + metastore.getThriftTableInfo(CarbonStorePath.getCarbonTablePath(storePath, + new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)))(sparkSession) + } else { + metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + } val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime == timeStamp) { @@ -200,14 +205,9 @@ object AlterTableUtil { FileFactory.getCarbonFile(carbonTablePath.getPath, fileType) .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR + oldTableIdentifier.table) - val tableIdentifier = new CarbonTableIdentifier(database, - oldTableIdentifier.table, - tableId) - CarbonEnv.getInstance(sparkSession).carbonMetastore.revertTableSchema(tableIdentifier, - tableInfo, - storePath)(sparkSession) - CarbonEnv.getInstance(sparkSession).carbonMetastore - .removeTableFromMetadata(database, newTableName) + val tableIdentifier = new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId) + metastore.revertTableSchema(tableIdentifier, tableInfo, storePath)(sparkSession) + metastore.removeTableFromMetadata(database, newTableName) } } } @@ -222,22 +222,21 @@ object AlterTableUtil { */ def revertAddColumnChanges(dbName: String, tableName: String, timeStamp: Long) (sparkSession: SparkSession): Unit = { - val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val carbonTable = metastore .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta .carbonTable val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, carbonTable.getCarbonTableIdentifier) - val tableMetadataFile = carbonTablePath.getSchemaFilePath - val thriftTable: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore - .readSchemaFile(tableMetadataFile) + val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime == timeStamp) { LOGGER.info(s"Reverting changes for $dbName.$tableName") val addedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).added thriftTable.fact_table.table_columns.removeAll(addedSchemas) - CarbonEnv.getInstance(sparkSession).carbonMetastore + metastore .revertTableSchema(carbonTable.getCarbonTableIdentifier, thriftTable, carbonTable.getStorePath)(sparkSession) } @@ -253,14 +252,13 @@ object AlterTableUtil { */ def revertDropColumnChanges(dbName: String, tableName: String, timeStamp: Long) (sparkSession: SparkSession): Unit = { - val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val carbonTable = metastore .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta .carbonTable val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, carbonTable.getCarbonTableIdentifier) - val tableMetadataFile = carbonTablePath.getSchemaFilePath - val thriftTable: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore - .readSchemaFile(tableMetadataFile) + val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime == timeStamp) { @@ -273,7 +271,7 @@ object AlterTableUtil { } } } - CarbonEnv.getInstance(sparkSession).carbonMetastore + metastore .revertTableSchema(carbonTable.getCarbonTableIdentifier, thriftTable, carbonTable.getStorePath)(sparkSession) } @@ -289,14 +287,13 @@ object AlterTableUtil { */ def revertDataTypeChanges(dbName: String, tableName: String, timeStamp: Long) (sparkSession: SparkSession): Unit = { - val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val carbonTable = metastore .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta .carbonTable val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, carbonTable.getCarbonTableIdentifier) - val tableMetadataFile = carbonTablePath.getSchemaFilePath - val thriftTable: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore - .readSchemaFile(tableMetadataFile) + val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime == timeStamp) { @@ -312,7 +309,7 @@ object AlterTableUtil { } } } - CarbonEnv.getInstance(sparkSession).carbonMetastore + metastore .revertTableSchema(carbonTable.getCarbonTableIdentifier, thriftTable, carbonTable.getStorePath)(sparkSession) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala index 74e11f1..645081f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala @@ -18,6 +18,7 @@ package org.apache.spark.util import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.hive.CarbonRelation import org.apache.carbondata.api.CarbonStore @@ -30,8 +31,9 @@ object CleanFiles { def cleanFiles(spark: SparkSession, dbName: String, tableName: String, storePath: String): Unit = { TableAPIUtil.validateTableExists(spark, dbName, tableName) - val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore - .getTableFromMetadata(dbName, tableName).map(_.carbonTable).getOrElse(null) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore. + lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable CarbonStore.cleanFiles(dbName, tableName, storePath, carbonTable) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala index 3dffb42..f67a5ce 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala @@ -16,7 +16,8 @@ */ package org.apache.spark.util -import org.apache.spark.sql.{CarbonEnv, SparkSession} + import org.apache.spark.sql.{CarbonEnv, SparkSession} + import org.apache.spark.sql.hive.CarbonRelation import org.apache.carbondata.api.CarbonStore @@ -29,8 +30,9 @@ object DeleteSegmentByDate { def deleteSegmentByDate(spark: SparkSession, dbName: String, tableName: String, dateValue: String): Unit = { TableAPIUtil.validateTableExists(spark, dbName, tableName) - val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore - .getTableFromMetadata(dbName, tableName).map(_.carbonTable).getOrElse(null) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore. + lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable CarbonStore.deleteLoadByDate(dateValue, dbName, tableName, carbonTable) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala index 35afa28..bbf386e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala @@ -17,6 +17,7 @@ package org.apache.spark.util import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.hive.CarbonRelation import org.apache.carbondata.api.CarbonStore @@ -33,8 +34,9 @@ object DeleteSegmentById { def deleteSegmentById(spark: SparkSession, dbName: String, tableName: String, segmentIds: Seq[String]): Unit = { TableAPIUtil.validateTableExists(spark, dbName, tableName) - val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore - .getTableFromMetadata(dbName, tableName).map(_.carbonTable).getOrElse(null) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore. + lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable CarbonStore.deleteLoadById(segmentIds, dbName, tableName, carbonTable) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala index 07dfcc1..d5788ba 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala @@ -21,6 +21,7 @@ import java.text.SimpleDateFormat import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.hive.CarbonRelation import org.apache.carbondata.api.CarbonStore @@ -30,8 +31,9 @@ object ShowSegments { def showSegments(spark: SparkSession, dbName: String, tableName: String, limit: Option[String]): Seq[Row] = { TableAPIUtil.validateTableExists(spark, dbName, tableName) - val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore - .getTableFromMetadata(dbName, tableName).map(_.carbonTable).getOrElse(null) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore. + lookupRelation(Some(dbName), tableName)(spark).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable CarbonStore.showSegments(dbName, tableName, limit, carbonTable.getMetaDataFilepath) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala index c584fc7..aa5b87d 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala @@ -23,7 +23,9 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.processing.constants.TableOptionConstant import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel} @@ -62,6 +64,14 @@ class AllDictionaryTestCase extends QueryTest with BeforeAndAfterAll { CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) + // Create table and metadata folders if not exist + val carbonTablePath = CarbonStorePath + .getCarbonTablePath(table.getStorePath, table.getCarbonTableIdentifier) + val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath + val fileType = FileFactory.getFileType(metadataDirectoryPath) + if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { + FileFactory.mkdirs(metadataDirectoryPath, fileType) + } carbonLoadModel } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala index 9c576ec..549587f 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala @@ -25,7 +25,9 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.processing.constants.TableOptionConstant import org.apache.carbondata.processing.etl.DataLoadingException import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel} @@ -175,6 +177,14 @@ class ExternalColumnDictionaryTestCase extends QueryTest with BeforeAndAfterAll CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) carbonLoadModel.setMaxColumns("100") + // Create table and metadata folders if not exist + val carbonTablePath = CarbonStorePath + .getCarbonTablePath(table.getStorePath, table.getCarbonTableIdentifier) + val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath + val fileType = FileFactory.getFileType(metadataDirectoryPath) + if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { + FileFactory.mkdirs(metadataDirectoryPath, fileType) + } carbonLoadModel } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala index 2995e60..1830ed6 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala @@ -195,6 +195,7 @@ class CarbonDataSourceSuite extends QueryTest with BeforeAndAfterAll { } test("test create table with complex datatype") { + sql("drop table if exists create_source") sql("create table create_source(intField int, stringField string, complexField array<string>) USING org.apache.spark.sql.CarbonSource OPTIONS('tableName'='create_source')") sql("drop table create_source") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java index 8cdcd26..1788ccb 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java @@ -256,19 +256,18 @@ public class CarbonCompactionUtil { /** * This will check if any compaction request has been received for any table. * - * @param tableMetas + * @param carbonTables * @return */ - public static TableMeta getNextTableToCompact(TableMeta[] tableMetas, + public static CarbonTable getNextTableToCompact(CarbonTable[] carbonTables, List<CarbonTableIdentifier> skipList) { - for (TableMeta table : tableMetas) { - CarbonTable ctable = table.carbonTable; + for (CarbonTable ctable : carbonTables) { String metadataPath = ctable.getMetaDataFilepath(); // check for the compaction required file and at the same time exclude the tables which are // present in the skip list. if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList - .contains(table.carbonTableIdentifier)) { - return table; + .contains(ctable.getCarbonTableIdentifier())) { + return ctable; } } return null;
