http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala index 31a08fc..9afb890 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala @@ -26,7 +26,7 @@ import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.CacheClient import org.apache.spark.sql.test.util.QueryTest @@ -112,11 +112,9 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor" ) - val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier - val carbontablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) - .getMetadataDirectoryPath - val segs = SegmentStatusManager.readLoadMetadata(carbontablePath) + val carbonTablePath = carbonTable.getMetadataPath + val segs = SegmentStatusManager.readLoadMetadata(carbonTablePath) // status should remain as compacted. assertResult(SegmentStatus.COMPACTED)(segs(3).getSegmentStatus) @@ -134,9 +132,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor" ) - val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier - val carbontablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier).getMetadataDirectoryPath + val carbontablePath = carbonTable.getMetadataPath val segs = SegmentStatusManager.readLoadMetadata(carbontablePath) // status should remain as compacted for segment 2.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala index 42ac4df..68a3058 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.test.util.QueryTest import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll { var filePath: String = _ @@ -193,8 +193,7 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll { CarbonCommonConstants.DATABASE_DEFAULT_NAME, tableName ) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val segmentDir = carbonTablePath.getCarbonDataDirectoryPath(segmentNo) + val segmentDir = carbonTable.getSemgentPath(segmentNo) new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size() } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala index db0a62c..b9d8e12 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala @@ -22,7 +22,7 @@ import java.io.{File, FilenameFilter} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.reader.CarbonIndexFileReader import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll @@ -48,8 +48,7 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll { sql(s"LOAD DATA LOCAL INPATH '$testData' into table test_table_v3") val indexReader = new CarbonIndexFileReader() val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "test_table_v3") - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0") + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0") val carbonIndexPaths = new File(segmentDir) .listFiles(new FilenameFilter { override def accept(dir: File, name: String): Boolean = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala index 479db50..cbbb191 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala @@ -32,8 +32,8 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore import org.apache.carbondata.core.metadata.CarbonMetadata -import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.spark.rdd.CarbonScanRDD +import org.apache.carbondata.core.util.path.CarbonTablePath class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { var filePath: String = s"$resourcesPath/globalsort" @@ -271,8 +271,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo } sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE carbon_globalsort") val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort") - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val segmentDir = carbonTablePath.getSegmentDir("0") + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0") assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length) } @@ -378,8 +377,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = { val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val segmentDir = carbonTablePath.getCarbonDataDirectoryPath(segmentNo) + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo) new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size() } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala index ed58253..7c82f75 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala @@ -25,7 +25,7 @@ import org.apache.commons.lang3.time.DateUtils import org.apache.spark.sql.Row import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties @@ -79,8 +79,8 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { "dataRetentionTable" ) absoluteTableIdentifierForRetention = carbonTable2.getAbsoluteTableIdentifier - carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifierForRetention).getMetadataDirectoryPath + carbonTablePath = CarbonTablePath + .getMetadataPath(absoluteTableIdentifierForRetention.getTablePath) carbonTableStatusLock = CarbonLockFactory .getCarbonLockObj(absoluteTableIdentifierForLock, LockUsage.TABLE_STATUS_LOCK) carbonDeleteSegmentLock= CarbonLockFactory http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala index 0a21aed..e5de8da 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala @@ -61,9 +61,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Seq[Int]): Unit = { val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) - val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, - carbonTable.getTablePath) - val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId) + val segmentDir = carbonTable.getSemgentPath(segmentId) val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { override def accept(file: CarbonFile): Boolean = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala index c8f7be3..2ce46ef 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala @@ -72,8 +72,6 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte def validateDataFiles(tableUniqueName: String, segmentId: String, partition: Int): Unit = { val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) - val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, - carbonTable.getTablePath) val partitions = CarbonFilters .getPartitions(Seq.empty, sqlContext.sparkSession, @@ -334,9 +332,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE mergeindexpartitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_mergeindexpartitionthree") - val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, - carbonTable.getTablePath) - val details = SegmentStatusManager.readTableStatusFile(tablePath.getTableStatusFilePath) + val details = SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)) val store = new SegmentFileStore(carbonTable.getTablePath, details(0).getSegmentFile) store.readIndexFiles() store.getIndexFiles http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index 5fc7e3d..8adcb11 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -177,7 +177,7 @@ object CarbonStore { LOGGER.audit(s"Delete segment by Id request has been received for $dbName.$tableName") validateLoadIds(loadids) - val path = carbonTable.getMetaDataFilepath + val path = carbonTable.getMetadataPath try { val invalidLoadIds = SegmentStatusManager.updateDeletionStatus( @@ -203,7 +203,7 @@ object CarbonStore { LOGGER.audit(s"Delete segment by Id request has been received for $dbName.$tableName") val time = validateTimeFormat(timestamp) - val path = carbonTable.getMetaDataFilepath + val path = carbonTable.getMetadataPath try { val invalidLoadTimestamps = http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala index 32d121e..3dd9903 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala @@ -30,7 +30,7 @@ trait ColumnValidator { */ trait DictionaryDetailService { def getDictionaryDetail(dictFolderPath: String, primDimensions: Array[CarbonDimension], - table: CarbonTableIdentifier, storePath: String): DictionaryDetail + tablePath: String): DictionaryDetail } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala index e861a8c..dbf47ab 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala @@ -23,12 +23,11 @@ import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFi import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnIdentifier} import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath class DictionaryDetailHelper extends DictionaryDetailService { - def getDictionaryDetail(dictfolderPath: String, primDimensions: Array[CarbonDimension], - table: CarbonTableIdentifier, storePath: String): DictionaryDetail = { - val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, table) + override def getDictionaryDetail(dictfolderPath: String, primDimensions: Array[CarbonDimension], + tablePath: String): DictionaryDetail = { val dictFilePaths = new Array[String](primDimensions.length) val dictFileExists = new Array[Boolean](primDimensions.length) val columnIdentifier = new Array[ColumnIdentifier](primDimensions.length) @@ -50,7 +49,7 @@ class DictionaryDetailHelper extends DictionaryDetailService { // 3 lookup fileNamesMap, if file name is in fileNamesMap, file is exists, or not. primDimensions.zipWithIndex.foreach { f => columnIdentifier(f._2) = f._1.getColumnIdentifier - dictFilePaths(f._2) = carbonTablePath.getDictionaryFilePath(f._1.getColumnId) + dictFilePaths(f._2) = CarbonTablePath.getDictionaryFilePath(tablePath, f._1.getColumnId) dictFileExists(f._2) = fileNamesMap.get(CarbonTablePath.getDictionaryFileName(f._1.getColumnId)) match { case None => false http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala index f2f4ecd..56a66b9 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala @@ -26,7 +26,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.statusmanager.SegmentStatus import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.spark.util.GlobalDictionaryUtil /** @@ -49,7 +49,7 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Par */ class AlterTableAddColumnRDD[K, V](sc: SparkContext, @transient newColumns: Seq[ColumnSchema], - absoluteTableIdentifier: AbsoluteTableIdentifier) + identifier: AbsoluteTableIdentifier) extends CarbonRDD[(Int, SegmentStatus)](sc, Nil) { val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE, @@ -70,8 +70,6 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext, // create dictionary file if it is a dictionary column if (columnSchema.hasEncoding(Encoding.DICTIONARY) && !columnSchema.hasEncoding(Encoding.DIRECT_DICTIONARY)) { - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier) var rawData: String = null if (null != columnSchema.getDefaultValue) { rawData = new String(columnSchema.getDefaultValue, @@ -79,16 +77,15 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext, } CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE, lockType) // Create table and metadata folders if not exist - val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath + val metadataDirectoryPath = CarbonTablePath.getMetadataPath(identifier.getTablePath) val fileType = FileFactory.getFileType(metadataDirectoryPath) if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { FileFactory.mkdirs(metadataDirectoryPath, fileType) } - GlobalDictionaryUtil - .loadDefaultDictionaryValueForNewColumn(carbonTablePath, - columnSchema, - absoluteTableIdentifier, - rawData) + GlobalDictionaryUtil.loadDefaultDictionaryValueForNewColumn( + columnSchema, + identifier, + rawData) } } catch { case ex: Exception => http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala index db29532..7acf4e2 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala @@ -40,10 +40,8 @@ import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, ColumnIdentifier} import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension -import org.apache.carbondata.core.service.{CarbonCommonFactory, PathService} import org.apache.carbondata.core.statusmanager.SegmentStatus import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil} -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.processing.loading.exception.NoRetryException import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.util.CarbonLoaderUtil @@ -348,10 +346,6 @@ class CarbonGlobalDictionaryGenerateRDD( model.table, model.columnIdentifier(split.index), model.columnIdentifier(split.index).getDataType) - val pathService: PathService = CarbonCommonFactory.getPathService - val carbonTablePath: CarbonTablePath = - pathService - .getCarbonTablePath(model.table, dictionaryColumnUniqueIdentifier) if (StringUtils.isNotBlank(model.hdfsTempLocation)) { CarbonProperties.getInstance.addProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, model.hdfsTempLocation) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 94668bd..7815c99 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -50,7 +50,7 @@ import org.apache.carbondata.core.scan.partition.PartitionUtil import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.comparator.Comparator -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException import org.apache.carbondata.processing.loading.model.CarbonLoadModel @@ -635,7 +635,7 @@ object CommonUtil { def readLoadMetadataDetails(model: CarbonLoadModel): Unit = { - val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath + val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath val details = SegmentStatusManager.readLoadMetadata(metadataPath) model.setLoadMetadataDetails(new util.ArrayList[LoadMetadataDetails](details.toList.asJava)) } @@ -866,20 +866,18 @@ object CommonUtil { val fileType = FileFactory.getFileType(databaseLocation) if (FileFactory.isFileExist(databaseLocation, fileType)) { val file = FileFactory.getCarbonFile(databaseLocation, fileType) - if (file.isDirectory) { - val tableFolders = file.listFiles() - tableFolders.foreach { tableFolder => - if (tableFolder.isDirectory) { - val tablePath = databaseLocation + - CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName - val identifier = - AbsoluteTableIdentifier.from(tablePath, dbName, tableFolder.getName) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier) - val tableStatusFile = carbonTablePath.getTableStatusFilePath - if (FileFactory.isFileExist(tableStatusFile, fileType)) { - val segmentStatusManager = new SegmentStatusManager(identifier) - val carbonLock = segmentStatusManager.getTableStatusLock - try { + if (file.isDirectory) { + val tableFolders = file.listFiles() + tableFolders.foreach { tableFolder => + if (tableFolder.isDirectory) { + val tablePath = databaseLocation + + CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName + val identifier = + AbsoluteTableIdentifier.from(tablePath, dbName, tableFolder.getName) + val tableStatusFile = + CarbonTablePath.getTableStatusFilePath(tablePath) + if (FileFactory.isFileExist(tableStatusFile, fileType)) { + try { val carbonTable = CarbonMetadata.getInstance .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName) DataLoadingUtil.deleteLoadsAndUpdateMetadata( http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala index 6767ef7..cee40c8 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala @@ -372,7 +372,7 @@ object DataLoadingUtil { isForceDeletion: Boolean, carbonTable: CarbonTable, specs: util.List[PartitionSpec]): Unit = { - if (isLoadDeletionRequired(carbonTable.getMetaDataFilepath)) { + if (isLoadDeletionRequired(carbonTable.getMetadataPath)) { val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier val (details, updationRequired) = @@ -406,7 +406,7 @@ object DataLoadingUtil { } // read latest table status again. val latestMetadata = SegmentStatusManager - .readLoadMetadata(carbonTable.getMetaDataFilepath) + .readLoadMetadata(carbonTable.getMetadataPath) // update the metadata details from old to new status. val latestStatus = CarbonLoaderUtil @@ -433,7 +433,7 @@ object DataLoadingUtil { if (updationCompletionStaus) { DeleteLoadFolders .physicalFactAndMeasureMetadataDeletion(absoluteTableIdentifier, - carbonTable.getMetaDataFilepath, isForceDeletion, specs) + carbonTable.getMetadataPath, isForceDeletion, specs) } } } @@ -442,14 +442,14 @@ object DataLoadingUtil { private def isUpdationRequired(isForceDeletion: Boolean, carbonTable: CarbonTable, absoluteTableIdentifier: AbsoluteTableIdentifier) = { - val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath) + val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) // Delete marked loads val isUpdationRequired = DeleteLoadFolders.deleteLoadFoldersFromFileSystem( absoluteTableIdentifier, isForceDeletion, details, - carbonTable.getMetaDataFilepath + carbonTable.getMetadataPath ) (details, isUpdationRequired) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala index 9e1ece7..2bd4f45 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala @@ -52,7 +52,7 @@ import org.apache.carbondata.core.reader.CarbonDictionaryReader import org.apache.carbondata.core.service.CarbonCommonFactory import org.apache.carbondata.core.statusmanager.SegmentStatus import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.writer.CarbonDictionaryWriter import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable} @@ -308,7 +308,7 @@ object GlobalDictionaryUtil { } val primDimensions = primDimensionsBuffer.map { x => x }.toArray val dictDetail = CarbonSparkFactory.getDictionaryDetailService. - getDictionaryDetail(dictFolderPath, primDimensions, table, carbonLoadModel.getTablePath) + getDictionaryDetail(dictFolderPath, primDimensions, carbonLoadModel.getTablePath) val dictFilePaths = dictDetail.dictFilePaths val dictFileExists = dictDetail.dictFileExists val columnIdentifier = dictDetail.columnIdentifiers @@ -398,10 +398,6 @@ object GlobalDictionaryUtil { } } - // Hack for spark2 integration - var updateTableMetadataFunc: (CarbonLoadModel, SQLContext, DictionaryLoadModel, - Array[CarbonDimension]) => Unit = _ - /** * check whether global dictionary have been generated successfully or not * @@ -705,10 +701,7 @@ object GlobalDictionaryUtil { try { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier - // create dictionary folder if not exists - val tablePath = carbonLoadModel.getTablePath - val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier) - val dictfolderPath = carbonTablePath.getMetadataDirectoryPath + val dictfolderPath = CarbonTablePath.getMetadataPath(carbonLoadModel.getTablePath) // columns which need to generate global dictionary file val dimensions = carbonTable.getDimensionByTableName( carbonTable.getTableName).asScala.toArray @@ -845,12 +838,11 @@ object GlobalDictionaryUtil { * This method will write dictionary file, sortindex file and dictionary meta for new dictionary * column with default value * - * @param carbonTablePath * @param columnSchema * @param absoluteTableIdentifier * @param defaultValue */ - def loadDefaultDictionaryValueForNewColumn(carbonTablePath: CarbonTablePath, + def loadDefaultDictionaryValueForNewColumn( columnSchema: ColumnSchema, absoluteTableIdentifier: AbsoluteTableIdentifier, defaultValue: String): Unit = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index 20d3032..71ce2c6 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -33,12 +33,14 @@ import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.CarbonTableIdentifier import org.apache.carbondata.core.metadata.SegmentFileStore.SegmentFile +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType} import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema._ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier, TableInfo, TableSchema} import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation} import org.apache.carbondata.core.service.CarbonCommonFactory +import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager} import org.apache.carbondata.core.util.DataTypeUtil import org.apache.carbondata.core.util.path.CarbonTablePath @@ -208,8 +210,7 @@ class AlterTableColumnSchemaGenerator( alterTableModel: AlterTableAddColumnsModel, dbName: String, tableInfo: TableInfo, - carbonTablePath: CarbonTablePath, - tableIdentifier: CarbonTableIdentifier, + tableIdentifier: AbsoluteTableIdentifier, sc: SparkContext) { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -370,7 +371,7 @@ object TableNewProcessor { encoders.add(Encoding.DIRECT_DICTIONARY) } columnSchema.setEncodingList(encoders) - val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator + val colUniqueIdGenerator = ColumnUniqueIdGenerator.getInstance val columnUniqueId = colUniqueIdGenerator.generateUniqueId(columnSchema) columnSchema.setColumnUniqueId(columnUniqueId) columnSchema.setColumnReferenceId(columnUniqueId) @@ -434,7 +435,7 @@ class TableNewProcessor(cm: TableModel) { } } columnSchema.setEncodingList(encoders) - val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator + val colUniqueIdGenerator = ColumnUniqueIdGenerator.getInstance val columnUniqueId = colUniqueIdGenerator.generateUniqueId(columnSchema) columnSchema.setColumnUniqueId(columnUniqueId) columnSchema.setColumnReferenceId(columnUniqueId) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala index 3c871db..1656efa 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala @@ -176,8 +176,6 @@ object PartitionUtils { getPartitionBlockList(identifier, segmentId, partitionIds, oldPartitionIds, partitionInfo, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable).asScala val pathList: util.List[String] = new util.ArrayList[String]() - val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "") - val carbonTablePath = new CarbonTablePath(carbonTableIdentifier, tablePath) tableBlockInfoList.foreach{ tableBlockInfo => val path = tableBlockInfo.getFilePath val timestamp = CarbonTablePath.DataFileUtil.getTimeStampFromFileName(path) @@ -190,8 +188,8 @@ object PartitionUtils { val batchNo = CarbonTablePath.DataFileUtil.getBatchNoFromTaskNo(taskNo) val taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskNo) val bucketNumber = CarbonTablePath.DataFileUtil.getBucketNo(path) - val indexFilePath = carbonTablePath.getCarbonIndexFilePath( - String.valueOf(taskId), segmentId, batchNo, String.valueOf(bucketNumber), + val indexFilePath = CarbonTablePath.getCarbonIndexFilePath( + tablePath, String.valueOf(taskId), segmentId, batchNo, String.valueOf(bucketNumber), timestamp, version) // indexFilePath could be duplicated when multiple data file related to one index file if (indexFilePath != null && !pathList.contains(indexFilePath)) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala index c8a6b1d..4266c51 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.OperationContext import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} @@ -74,7 +74,7 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, "true") loadCommand.processData(sqlContext.sparkSession) val newLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata( - carbonTable.getMetaDataFilepath, uuid) + carbonTable.getMetadataPath, uuid) val updatedLoadMetaDataDetails = newLoadMetaDataDetails collect { case load if loadMetaDataDetails.contains(load) => load.setMergedLoadName(mergedLoadName) @@ -83,11 +83,8 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, load case other => other } - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - .getAbsoluteTableIdentifier) - SegmentStatusManager - .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePathWithUUID(uuid), + SegmentStatusManager.writeLoadDetailsIntoFile( + CarbonTablePath.getTableStatusFilePathWithUUID(carbonTable.getTablePath, uuid), updatedLoadMetaDataDetails) carbonLoadModel.setLoadMetadataDetails(updatedLoadMetaDataDetails.toList.asJava) } finally { @@ -108,11 +105,9 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, // 4. Therefore tablestatus file will be committed in between multiple commits. if (!compactionModel.compactionType.equals(CompactionType.MAJOR)) { if (!identifySegmentsToBeMerged().isEmpty) { - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - .getAbsoluteTableIdentifier) - val uuidTableStaus = carbonTablePath.getTableStatusFilePathWithUUID(uuid) - val tableStatus = carbonTablePath.getTableStatusFilePath + val uuidTableStaus = CarbonTablePath.getTableStatusFilePathWithUUID( + carbonTable.getTablePath, uuid) + val tableStatus = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) if (!uuidTableStaus.equalsIgnoreCase(tableStatus)) { FileFactory.getCarbonFile(uuidTableStaus).renameForce(tableStatus) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index add038d..349c436 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -59,7 +59,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.scan.partition.PartitionUtil import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil} -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.FailureCauses @@ -72,7 +72,7 @@ import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonData import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _} import org.apache.carbondata.spark.load._ -import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, Util} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util} /** * This is the factory class which can create different RDD depends on user needs. @@ -127,7 +127,7 @@ object CarbonDataRDDFactory { LOGGER.error("Not able to acquire the compaction lock for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") CarbonCompactionUtil - .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType) + .createCompactionRequiredFile(carbonTable.getMetadataPath, compactionType) // throw exception only in case of DDL trigger. if (compactionModel.isDDLTrigger) { CarbonException.analysisException( @@ -195,7 +195,7 @@ object CarbonDataRDDFactory { s"${ tableForCompaction.getDatabaseName }." + s"${ tableForCompaction.getTableName}") val table: CarbonTable = tableForCompaction - val metadataPath = table.getMetaDataFilepath + val metadataPath = table.getMetadataPath val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath) val newCarbonLoadModel = prepareCarbonLoadModel(table) @@ -580,15 +580,13 @@ object CarbonDataRDDFactory { (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*))) val loadMetadataDetails = SegmentStatusManager.readLoadMetadata( - carbonTable.getMetaDataFilepath) + carbonTable.getMetadataPath) .filter(lmd => lmd.getSegmentStatus.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS) || lmd.getSegmentStatus.equals(SegmentStatus.SUCCESS)) val segmentIds = loadMetadataDetails.map(_.getLoadName) val segmentIdIndex = segmentIds.zipWithIndex.toMap - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonLoadModel.getTablePath, - carbonTable.getCarbonTableIdentifier) val segmentId2maxTaskNo = segmentIds.map { segId => - (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonTablePath)) + (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonLoadModel.getTablePath)) }.toMap class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index a4fa37a..1210b92 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -139,7 +139,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, val validSegments: List[Segment] = CarbonDataMergerUtil.getValidSegments(loadsToMerge) val carbonMergerMapping = CarbonMergerMapping( tablePath, - carbonTable.getMetaDataFilepath, + carbonTable.getMetadataPath, mergedLoadName, databaseName, factTableName, @@ -151,7 +151,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, currentPartitions = partitions) carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation) carbonLoadModel.setLoadMetadataDetails( - SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava) + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath).toList.asJava) // trigger event for compaction val alterTableCompactionPreEvent: AlterTableCompactionPreEvent = AlterTableCompactionPreEvent(sqlContext.sparkSession, @@ -237,11 +237,11 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, ((compactionType == CompactionType.IUD_UPDDEL_DELTA) && CarbonDataMergerUtil .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge, - carbonTable.getMetaDataFilepath, + carbonTable.getMetadataPath, carbonLoadModel)) || CarbonDataMergerUtil.updateLoadMetadataWithMergeStatus( loadsToMerge, - carbonTable.getMetaDataFilepath, + carbonTable.getMetadataPath, mergedLoadNumber, carbonLoadModel, compactionType, http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index 9b9ca0e..acc3358 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -34,12 +34,12 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} -import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} @@ -292,9 +292,10 @@ case class CarbonAlterTableCompactionCommand( true)(sparkSession, sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]) // 5. remove checkpoint - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - FileFactory.deleteAllFilesOfDir(new File(tablePath.getStreamingCheckpointDir)) - FileFactory.deleteAllFilesOfDir(new File(tablePath.getStreamingLogDir)) + FileFactory.deleteAllFilesOfDir( + new File(CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))) + FileFactory.deleteAllFilesOfDir( + new File(CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath))) } else { val msg = "Failed to close streaming table, because streaming is locked for table " + carbonTable.getDatabaseName() + "." + carbonTable.getTableName() http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index fc8e975..a42031d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -60,7 +60,9 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} import org.apache.carbondata.core.statusmanager.SegmentStatus import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.events.exception.PreEventException import org.apache.carbondata.hadoop.util.ObjectSerializationUtil @@ -258,8 +260,7 @@ case class CarbonLoadDataCommand( carbonLoadModel.setUseOnePass(false) } // Create table and metadata folders if not exist - val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier) - val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath + val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath) val fileType = FileFactory.getFileType(metadataDirectoryPath) if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { FileFactory.mkdirs(metadataDirectoryPath, fileType) @@ -354,9 +355,7 @@ case class CarbonLoadDataCommand( val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier .getCarbonTableIdentifier - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonLoadModel.getTablePath, carbonTableIdentifier) - val dictFolderPath = carbonTablePath.getMetadataDirectoryPath + val dictFolderPath = CarbonTablePath.getMetadataPath(carbonLoadModel.getTablePath) val dimensions = carbonTable.getDimensionByTableName( carbonTable.getTableName).asScala.toArray val colDictFilePath = carbonLoadModel.getColDictFilePath @@ -1041,4 +1040,5 @@ case class CarbonLoadDataCommand( val dataFrameWithTupleId = dataFrame.get.select(fieldWithTupleId: _*) (dataFrameWithTupleId) } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala index f8f215f..1e5885e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala @@ -45,7 +45,7 @@ case class CarbonShowLoadsCommand( val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) CarbonStore.showSegments( limit, - carbonTable.getMetaDataFilepath + carbonTable.getMetadataPath ) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala index e3e4c7a..a8316e9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala @@ -36,7 +36,7 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent} import org.apache.carbondata.hadoop.util.SchemaReader @@ -63,19 +63,18 @@ case class RefreshCarbonTableCommand( // 2.2.1 validate that all the aggregate tables are copied at the store location. // 2.2.2 Register the aggregate tables val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession) - val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName) + val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName) // 2.1 check if the table already register with hive then ignore and continue with the next // schema if (!sparkSession.sessionState.catalog.listTables(databaseName) .exists(_.table.equalsIgnoreCase(tableName))) { - val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) // check the existence of the schema file to know its a carbon table - val schemaFilePath = carbonTablePath.getSchemaFilePath + val schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath) // if schema file does not exist then the table will either non carbon table or stale // carbon table if (FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath))) { // read TableInfo - val tableInfo = SchemaReader.getTableInfo(absoluteTableIdentifier) + val tableInfo = SchemaReader.getTableInfo(identifier) // 2.2 register the table with the hive check if the table being registered has // aggregate table then do the below steps // 2.2.1 validate that all the aggregate tables are copied at the store location. @@ -99,7 +98,7 @@ case class RefreshCarbonTableCommand( // Register partitions to hive metastore in case of hive partitioning carbon table if (tableInfo.getFactTable.getPartitionInfo != null && tableInfo.getFactTable.getPartitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) { - registerAllPartitionsToHive(absoluteTableIdentifier, sparkSession) + registerAllPartitionsToHive(identifier, sparkSession) } } else { LOGGER.audit( @@ -178,9 +177,7 @@ case class RefreshCarbonTableCommand( dataMapSchemaList.asScala.foreach(dataMap => { val tableName = dataMap.getChildSchema.getTableName val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, - new CarbonTableIdentifier(dbName, tableName, dataMap.getChildSchema.getTableId)) - val schemaFilePath = carbonTablePath.getSchemaFilePath + val schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath) try { fileExist = FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath)) } catch { @@ -191,7 +188,7 @@ case class RefreshCarbonTableCommand( return fileExist; } }) - return true + true } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index 25d5e91..10d55ef 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -41,7 +41,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum} import org.apache.carbondata.core.mutate.data.RowCountDetailsVO import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentUpdateStatusManager} -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl import org.apache.carbondata.hadoop.api.CarbonTableInputFormat import org.apache.carbondata.processing.exception.MultipleMatchingException @@ -68,12 +68,11 @@ object DeleteExecution { val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier - val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) val isPartitionTable = carbonTable.isHivePartitionTable val factPath = if (isPartitionTable) { - carbonTablePath.getPath + absoluteTableIdentifier.getTablePath } else { - carbonTablePath.getFactDir + CarbonTablePath.getFactDir(absoluteTableIdentifier.getTablePath) } var segmentsTobeDeleted = Seq.empty[Segment] http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala index 38ac58e..c1f86ef 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala @@ -39,7 +39,6 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.spark.partition.DropPartitionCallable @@ -65,8 +64,8 @@ case class CarbonAlterTableDropPartitionCommand( if (relation == null || CarbonMetadata.getInstance.getCarbonTable(dbName, tableName) == null) { throwMetadataException(dbName, tableName, "table not found") } - val table = relation.carbonTable - val partitionInfo = table.getPartitionInfo(tableName) + val carbonTable = relation.carbonTable + val partitionInfo = carbonTable.getPartitionInfo(tableName) if (partitionInfo == null) { throwMetadataException(dbName, tableName, "table is not a partition table") } @@ -92,10 +91,9 @@ case class CarbonAlterTableDropPartitionCommand( "Dropping range interval partition is unsupported") } partitionInfo.dropPartition(partitionIndex) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier) - // read TableInfo - val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession) + // read TableInfo + val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) @@ -108,11 +106,11 @@ case class CarbonAlterTableDropPartitionCommand( thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) .setTime_stamp(System.currentTimeMillis) carbonMetaStore.updateTableSchemaForAlter( - table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, - table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, thriftTable, null, - table.getAbsoluteTableIdentifier.getTablePath)(sparkSession) + carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession) // update the schema modified time carbonMetaStore.updateAndTouchSchemasUpdatedTime() // sparkSession.catalog.refreshTable(tableName) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala index 78cf2b8..efd1216 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala @@ -41,7 +41,6 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.spark.partition.SplitPartitionCallable @@ -72,8 +71,8 @@ case class CarbonAlterTableSplitPartitionCommand( LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName") throwMetadataException(dbName, tableName, "table not found") } - val table = relation.carbonTable - val partitionInfo = table.getPartitionInfo(tableName) + val carbonTable = relation.carbonTable + val partitionInfo = carbonTable.getPartitionInfo(tableName) val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList // keep a copy of partitionIdList before update partitionInfo. // will be used in partition data scan @@ -88,9 +87,8 @@ case class CarbonAlterTableSplitPartitionCommand( updatePartitionInfo(partitionInfo, partitionIds) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier) // read TableInfo - val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession) + val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) @@ -100,12 +98,12 @@ case class CarbonAlterTableSplitPartitionCommand( wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis()) val thriftTable = schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) - carbonMetaStore - .updateTableSchemaForAlter(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, - table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, - thriftTable, - null, - table.getAbsoluteTableIdentifier.getTablePath)(sparkSession) + carbonMetaStore.updateTableSchemaForAlter( + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + thriftTable, + null, + carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession) // update the schema modified time carbonMetaStore.updateAndTouchSchemasUpdatedTime() Seq.empty http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index 59c43aa..4d0a4c5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -184,7 +184,7 @@ case class CreatePreAggregateTableCommand( CarbonFilters.getCurrentPartitions(sparkSession, TableIdentifier(parentTable.getTableName, Some(parentTable.getDatabaseName))).map(_.asJava).orNull) - val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetaDataFilepath) + val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath) if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS || load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) { throw new UnsupportedOperationException( http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index 657e0c5..11f451b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -290,13 +290,12 @@ object CommitPreAggregateListener extends OperationEventListener { // keep committing until one fails val renamedDataMaps = childLoadCommands.takeWhile { childLoadCommand => val childCarbonTable = childLoadCommand.table - val carbonTablePath = - new CarbonTablePath(childCarbonTable.getCarbonTableIdentifier, - childCarbonTable.getTablePath) // Generate table status file name with UUID, forExample: tablestatus_1 - val oldTableSchemaPath = carbonTablePath.getTableStatusFilePathWithUUID(uuid) + val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID( + childCarbonTable.getTablePath, uuid) // Generate table status file name without UUID, forExample: tablestatus - val newTableSchemaPath = carbonTablePath.getTableStatusFilePath + val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath( + childCarbonTable.getTablePath) renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid) } // if true then the commit for one of the child tables has failed @@ -306,11 +305,10 @@ object CommitPreAggregateListener extends OperationEventListener { renamedDataMaps.foreach { loadCommand => val carbonTable = loadCommand.table - val carbonTablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, - carbonTable.getTablePath) // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus - val backupTableSchemaPath = carbonTablePath.getTableStatusFilePath + "_backup_" + uuid - val tableSchemaPath = carbonTablePath.getTableStatusFilePath + val backupTableSchemaPath = + CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid + val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) markInProgressSegmentAsDeleted(backupTableSchemaPath, operationContext, loadCommand) renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "") } @@ -377,9 +375,8 @@ object CommitPreAggregateListener extends OperationEventListener { operationContext: OperationContext, uuid: String): Unit = { childTables.foreach { childTable => - val carbonTablePath = new CarbonTablePath(childTable.getCarbonTableIdentifier, - childTable.getTablePath) - val metaDataDir = FileFactory.getCarbonFile(carbonTablePath.getMetadataDirectoryPath) + val metaDataDir = FileFactory.getCarbonFile( + CarbonTablePath.getMetadataPath(childTable.getTablePath)) val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = { file.getName.contains(uuid) || file.getName.contains("backup") http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index 1bd12cd..ebf87d2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -16,11 +16,12 @@ */ package org.apache.spark.sql.execution.command.preaaggregate -import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, ListBuffer} -import org.apache.spark.sql._ -import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias, MatchCastExpression} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, SparkSession, _} +import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias} +import org.apache.spark.sql.CarbonExpressions.MatchCastExpression import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSeq, Cast, Expression, ExprId, NamedExpression, ScalaUDF} @@ -35,16 +36,12 @@ import org.apache.spark.sql.types.DataType import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} -import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema, TableSchema} import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.format.TableInfo -import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CommonUtil @@ -427,9 +424,7 @@ object PreAggregateUtil { locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable) // get the latest carbon table and check for column existence // read the latest schema file - val carbonTablePath = CarbonStorePath.getCarbonTablePath( - carbonTable.getAbsoluteTableIdentifier) - val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( thriftTableInfo, @@ -518,6 +513,26 @@ object PreAggregateUtil { } } + /** + * This method reverts the changes to the schema if add column command fails. + * + * @param dbName + * @param tableName + * @param numberOfChildSchema + * @param sparkSession + */ + def revertMainTableChanges(dbName: String, tableName: String, numberOfChildSchema: Int) + (sparkSession: SparkSession): Unit = { + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) + carbonTable.getTableLastUpdatedTime + val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) + if (thriftTable.dataMapSchemas.size > numberOfChildSchema) { + metastore.revertTableSchemaForPreAggCreationFailure( + carbonTable.getAbsoluteTableIdentifier, thriftTable)(sparkSession) + } + } + def getChildCarbonTable(databaseName: String, tableName: String) (sparkSession: SparkSession): Option[CarbonTable] = { val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala index 4b43ea7..064ba18 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala @@ -28,7 +28,6 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.events.{AlterTableAddColumnPostEvent, AlterTableAddColumnPreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.TableInfo import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD} @@ -64,9 +63,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand( OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener, operationContext) // get the latest carbon table and check for column existence // read the latest schema file - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, @@ -76,8 +73,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand( newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel, dbName, wrapperTableInfo, - carbonTablePath, - carbonTable.getCarbonTableIdentifier, + carbonTable.getAbsoluteTableIdentifier, sparkSession.sparkContext).process // generate dictionary files for the newly added columns new AlterTableAddColumnRDD(sparkSession.sparkContext, http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala index 571e23f..8b0d75f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala @@ -27,7 +27,6 @@ import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.events.{AlterTableDataTypeChangePostEvent, AlterTableDataTypeChangePreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo} import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil} @@ -74,9 +73,7 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand( throwMetadataException(dbName, tableName, s"Invalid Column: $columnName") } // read the latest schema file - val carbonTablePath = - CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) // maintain the added column for schema evolution history var addColumnSchema: ColumnSchema = null var deletedColumnSchema: ColumnSchema = null @@ -84,8 +81,9 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand( columnSchemaList.foreach { columnSchema => if (columnSchema.column_name.equalsIgnoreCase(columnName)) { deletedColumnSchema = columnSchema.deepCopy - columnSchema.setData_type(DataTypeConverterUtil - .convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType)) + columnSchema.setData_type( + DataTypeConverterUtil.convertToThriftDataType( + alterTableDataTypeChangeModel.dataTypeInfo.dataType)) columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision) columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale) addColumnSchema = columnSchema @@ -97,8 +95,8 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand( schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava) tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) .setTime_stamp(System.currentTimeMillis) - AlterTableUtil - .updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession, + AlterTableUtil.updateSchemaInfo( + carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession, sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]) val alterTablePostExecutionEvent: AlterTableDataTypeChangePostEvent = new AlterTableDataTypeChangePostEvent(sparkSession, carbonTable, http://git-wip-us.apache.org/repos/asf/carbondata/blob/9b9125b6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala index 780ac8f..7bbefd7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala @@ -29,7 +29,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{AlterTableDropColumnPostEvent, AlterTableDropColumnPreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.SchemaEvolutionEntry import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD @@ -99,10 +99,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand( OperationListenerBus.getInstance().fireEvent(alterTableDropColumnPreEvent, operationContext) // read the latest schema file - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) val tableInfo: org.apache.carbondata.format.TableInfo = - metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + metastore.getThriftTableInfo(carbonTable)(sparkSession) // maintain the deleted columns for schema evolution history var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]() val columnSchemaList = tableInfo.fact_table.table_columns.asScala
