http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala index c8a6b1d..184bf1b 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.OperationContext import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} @@ -74,7 +74,7 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, "true") loadCommand.processData(sqlContext.sparkSession) val newLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata( - carbonTable.getMetaDataFilepath, uuid) + carbonTable.getMetadataPath, uuid) val updatedLoadMetaDataDetails = newLoadMetaDataDetails collect { case load if loadMetaDataDetails.contains(load) => load.setMergedLoadName(mergedLoadName) @@ -83,11 +83,8 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, load case other => other } - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - .getAbsoluteTableIdentifier) - SegmentStatusManager - .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePathWithUUID(uuid), + SegmentStatusManager.writeLoadDetailsIntoFile( + CarbonTablePath.getTableStatusFilePathWithUUID(uuid), updatedLoadMetaDataDetails) carbonLoadModel.setLoadMetadataDetails(updatedLoadMetaDataDetails.toList.asJava) } finally {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 6f08154..8d3110a 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -59,7 +59,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.scan.partition.PartitionUtil import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil} -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.FailureCauses @@ -72,7 +72,7 @@ import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonData import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _} import org.apache.carbondata.spark.load._ -import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, Util} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util} /** * This is the factory class which can create different RDD depends on user needs. @@ -127,7 +127,7 @@ object CarbonDataRDDFactory { LOGGER.error("Not able to acquire the compaction lock for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") CarbonCompactionUtil - .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType) + .createCompactionRequiredFile(carbonTable.getMetadataPath, compactionType) // throw exception only in case of DDL trigger. if (compactionModel.isDDLTrigger) { CarbonException.analysisException( @@ -195,7 +195,7 @@ object CarbonDataRDDFactory { s"${ tableForCompaction.getDatabaseName }." + s"${ tableForCompaction.getTableName}") val table: CarbonTable = tableForCompaction - val metadataPath = table.getMetaDataFilepath + val metadataPath = table.getMetadataPath val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath) val newCarbonLoadModel = prepareCarbonLoadModel(table) @@ -577,15 +577,13 @@ object CarbonDataRDDFactory { (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*))) val loadMetadataDetails = SegmentStatusManager.readLoadMetadata( - carbonTable.getMetaDataFilepath) + carbonTable.getMetadataPath) .filter(lmd => lmd.getSegmentStatus.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS) || lmd.getSegmentStatus.equals(SegmentStatus.SUCCESS)) val segmentIds = loadMetadataDetails.map(_.getLoadName) val segmentIdIndex = segmentIds.zipWithIndex.toMap - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonLoadModel.getTablePath, - carbonTable.getCarbonTableIdentifier) val segmentId2maxTaskNo = segmentIds.map { segId => - (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonTablePath)) + (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonLoadModel.getTablePath)) }.toMap class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index 07acaa5..e1bef9c 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -136,7 +136,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, val validSegments: List[Segment] = CarbonDataMergerUtil.getValidSegments(loadsToMerge) val carbonMergerMapping = CarbonMergerMapping( tablePath, - carbonTable.getMetaDataFilepath, + carbonTable.getMetadataPath, mergedLoadName, databaseName, factTableName, @@ -148,7 +148,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, currentPartitions = partitions) carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation) carbonLoadModel.setLoadMetadataDetails( - SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava) + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath).toList.asJava) // trigger event for compaction val alterTableCompactionPreEvent: AlterTableCompactionPreEvent = AlterTableCompactionPreEvent(sqlContext.sparkSession, @@ -234,11 +234,11 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, ((compactionType == CompactionType.IUD_UPDDEL_DELTA) && CarbonDataMergerUtil .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge, - carbonTable.getMetaDataFilepath, + carbonTable.getMetadataPath, carbonLoadModel)) || CarbonDataMergerUtil.updateLoadMetadataWithMergeStatus( loadsToMerge, - carbonTable.getMetaDataFilepath, + carbonTable.getMetadataPath, mergedLoadNumber, carbonLoadModel, compactionType, http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index 9b9ca0e..acc3358 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -34,12 +34,12 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} -import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} @@ -292,9 +292,10 @@ case class CarbonAlterTableCompactionCommand( true)(sparkSession, sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]) // 5. remove checkpoint - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - FileFactory.deleteAllFilesOfDir(new File(tablePath.getStreamingCheckpointDir)) - FileFactory.deleteAllFilesOfDir(new File(tablePath.getStreamingLogDir)) + FileFactory.deleteAllFilesOfDir( + new File(CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))) + FileFactory.deleteAllFilesOfDir( + new File(CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath))) } else { val msg = "Failed to close streaming table, because streaming is locked for table " + carbonTable.getDatabaseName() + "." + carbonTable.getTableName() http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 7800d3e..5dbd383 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -60,7 +60,9 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} import org.apache.carbondata.core.statusmanager.SegmentStatus import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.events.exception.PreEventException import org.apache.carbondata.hadoop.util.ObjectSerializationUtil @@ -258,8 +260,7 @@ case class CarbonLoadDataCommand( carbonLoadModel.setUseOnePass(false) } // Create table and metadata folders if not exist - val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier) - val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath + val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath) val fileType = FileFactory.getFileType(metadataDirectoryPath) if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { FileFactory.mkdirs(metadataDirectoryPath, fileType) @@ -354,9 +355,7 @@ case class CarbonLoadDataCommand( val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier .getCarbonTableIdentifier - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonLoadModel.getTablePath, carbonTableIdentifier) - val dictFolderPath = carbonTablePath.getMetadataDirectoryPath + val dictFolderPath = CarbonTablePath.getMetadataPath(carbonLoadModel.getTablePath) val dimensions = carbonTable.getDimensionByTableName( carbonTable.getTableName).asScala.toArray val colDictFilePath = carbonLoadModel.getColDictFilePath @@ -1035,4 +1034,5 @@ case class CarbonLoadDataCommand( val dataFrameWithTupleId = dataFrame.get.select(fieldWithTupleId: _*) (dataFrameWithTupleId) } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala index f8f215f..1e5885e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala @@ -45,7 +45,7 @@ case class CarbonShowLoadsCommand( val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) CarbonStore.showSegments( limit, - carbonTable.getMetaDataFilepath + carbonTable.getMetadataPath ) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala index e3e4c7a..a8316e9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala @@ -36,7 +36,7 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent} import org.apache.carbondata.hadoop.util.SchemaReader @@ -63,19 +63,18 @@ case class RefreshCarbonTableCommand( // 2.2.1 validate that all the aggregate tables are copied at the store location. // 2.2.2 Register the aggregate tables val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession) - val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName) + val identifier = AbsoluteTableIdentifier.from(tablePath, databaseName, tableName) // 2.1 check if the table already register with hive then ignore and continue with the next // schema if (!sparkSession.sessionState.catalog.listTables(databaseName) .exists(_.table.equalsIgnoreCase(tableName))) { - val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) // check the existence of the schema file to know its a carbon table - val schemaFilePath = carbonTablePath.getSchemaFilePath + val schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath) // if schema file does not exist then the table will either non carbon table or stale // carbon table if (FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath))) { // read TableInfo - val tableInfo = SchemaReader.getTableInfo(absoluteTableIdentifier) + val tableInfo = SchemaReader.getTableInfo(identifier) // 2.2 register the table with the hive check if the table being registered has // aggregate table then do the below steps // 2.2.1 validate that all the aggregate tables are copied at the store location. @@ -99,7 +98,7 @@ case class RefreshCarbonTableCommand( // Register partitions to hive metastore in case of hive partitioning carbon table if (tableInfo.getFactTable.getPartitionInfo != null && tableInfo.getFactTable.getPartitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) { - registerAllPartitionsToHive(absoluteTableIdentifier, sparkSession) + registerAllPartitionsToHive(identifier, sparkSession) } } else { LOGGER.audit( @@ -178,9 +177,7 @@ case class RefreshCarbonTableCommand( dataMapSchemaList.asScala.foreach(dataMap => { val tableName = dataMap.getChildSchema.getTableName val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, - new CarbonTableIdentifier(dbName, tableName, dataMap.getChildSchema.getTableId)) - val schemaFilePath = carbonTablePath.getSchemaFilePath + val schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath) try { fileExist = FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath)) } catch { @@ -191,7 +188,7 @@ case class RefreshCarbonTableCommand( return fileExist; } }) - return true + true } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index 25d5e91..10d55ef 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -41,7 +41,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum} import org.apache.carbondata.core.mutate.data.RowCountDetailsVO import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentUpdateStatusManager} -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl import org.apache.carbondata.hadoop.api.CarbonTableInputFormat import org.apache.carbondata.processing.exception.MultipleMatchingException @@ -68,12 +68,11 @@ object DeleteExecution { val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier - val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) val isPartitionTable = carbonTable.isHivePartitionTable val factPath = if (isPartitionTable) { - carbonTablePath.getPath + absoluteTableIdentifier.getTablePath } else { - carbonTablePath.getFactDir + CarbonTablePath.getFactDir(absoluteTableIdentifier.getTablePath) } var segmentsTobeDeleted = Seq.empty[Segment] http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala index 38ac58e..c1f86ef 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala @@ -39,7 +39,6 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.spark.partition.DropPartitionCallable @@ -65,8 +64,8 @@ case class CarbonAlterTableDropPartitionCommand( if (relation == null || CarbonMetadata.getInstance.getCarbonTable(dbName, tableName) == null) { throwMetadataException(dbName, tableName, "table not found") } - val table = relation.carbonTable - val partitionInfo = table.getPartitionInfo(tableName) + val carbonTable = relation.carbonTable + val partitionInfo = carbonTable.getPartitionInfo(tableName) if (partitionInfo == null) { throwMetadataException(dbName, tableName, "table is not a partition table") } @@ -92,10 +91,9 @@ case class CarbonAlterTableDropPartitionCommand( "Dropping range interval partition is unsupported") } partitionInfo.dropPartition(partitionIndex) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier) - // read TableInfo - val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession) + // read TableInfo + val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) @@ -108,11 +106,11 @@ case class CarbonAlterTableDropPartitionCommand( thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) .setTime_stamp(System.currentTimeMillis) carbonMetaStore.updateTableSchemaForAlter( - table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, - table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, thriftTable, null, - table.getAbsoluteTableIdentifier.getTablePath)(sparkSession) + carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession) // update the schema modified time carbonMetaStore.updateAndTouchSchemasUpdatedTime() // sparkSession.catalog.refreshTable(tableName) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala index 78cf2b8..efd1216 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala @@ -41,7 +41,6 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.spark.partition.SplitPartitionCallable @@ -72,8 +71,8 @@ case class CarbonAlterTableSplitPartitionCommand( LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName") throwMetadataException(dbName, tableName, "table not found") } - val table = relation.carbonTable - val partitionInfo = table.getPartitionInfo(tableName) + val carbonTable = relation.carbonTable + val partitionInfo = carbonTable.getPartitionInfo(tableName) val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList // keep a copy of partitionIdList before update partitionInfo. // will be used in partition data scan @@ -88,9 +87,8 @@ case class CarbonAlterTableSplitPartitionCommand( updatePartitionInfo(partitionInfo, partitionIds) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier) // read TableInfo - val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession) + val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) @@ -100,12 +98,12 @@ case class CarbonAlterTableSplitPartitionCommand( wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis()) val thriftTable = schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) - carbonMetaStore - .updateTableSchemaForAlter(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, - table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, - thriftTable, - null, - table.getAbsoluteTableIdentifier.getTablePath)(sparkSession) + carbonMetaStore.updateTableSchemaForAlter( + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + thriftTable, + null, + carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession) // update the schema modified time carbonMetaStore.updateAndTouchSchemasUpdatedTime() Seq.empty http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index 1bd12cd..ebf87d2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -16,11 +16,12 @@ */ package org.apache.spark.sql.execution.command.preaaggregate -import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, ListBuffer} -import org.apache.spark.sql._ -import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias, MatchCastExpression} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, SparkSession, _} +import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias} +import org.apache.spark.sql.CarbonExpressions.MatchCastExpression import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSeq, Cast, Expression, ExprId, NamedExpression, ScalaUDF} @@ -35,16 +36,12 @@ import org.apache.spark.sql.types.DataType import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} -import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema, TableSchema} import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.format.TableInfo -import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CommonUtil @@ -427,9 +424,7 @@ object PreAggregateUtil { locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable) // get the latest carbon table and check for column existence // read the latest schema file - val carbonTablePath = CarbonStorePath.getCarbonTablePath( - carbonTable.getAbsoluteTableIdentifier) - val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( thriftTableInfo, @@ -518,6 +513,26 @@ object PreAggregateUtil { } } + /** + * This method reverts the changes to the schema if add column command fails. + * + * @param dbName + * @param tableName + * @param numberOfChildSchema + * @param sparkSession + */ + def revertMainTableChanges(dbName: String, tableName: String, numberOfChildSchema: Int) + (sparkSession: SparkSession): Unit = { + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) + carbonTable.getTableLastUpdatedTime + val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) + if (thriftTable.dataMapSchemas.size > numberOfChildSchema) { + metastore.revertTableSchemaForPreAggCreationFailure( + carbonTable.getAbsoluteTableIdentifier, thriftTable)(sparkSession) + } + } + def getChildCarbonTable(databaseName: String, tableName: String) (sparkSession: SparkSession): Option[CarbonTable] = { val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala index 4b43ea7..064ba18 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala @@ -28,7 +28,6 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.events.{AlterTableAddColumnPostEvent, AlterTableAddColumnPreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.TableInfo import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD} @@ -64,9 +63,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand( OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener, operationContext) // get the latest carbon table and check for column existence // read the latest schema file - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, @@ -76,8 +73,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand( newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel, dbName, wrapperTableInfo, - carbonTablePath, - carbonTable.getCarbonTableIdentifier, + carbonTable.getAbsoluteTableIdentifier, sparkSession.sparkContext).process // generate dictionary files for the newly added columns new AlterTableAddColumnRDD(sparkSession.sparkContext, http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala index 571e23f..f4077e6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala @@ -26,8 +26,9 @@ import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{AlterTableDataTypeChangePostEvent, AlterTableDataTypeChangePreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo} import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil} @@ -74,9 +75,7 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand( throwMetadataException(dbName, tableName, s"Invalid Column: $columnName") } // read the latest schema file - val carbonTablePath = - CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) // maintain the added column for schema evolution history var addColumnSchema: ColumnSchema = null var deletedColumnSchema: ColumnSchema = null http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala index 780ac8f..7bbefd7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala @@ -29,7 +29,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{AlterTableDropColumnPostEvent, AlterTableDropColumnPreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.SchemaEvolutionEntry import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD @@ -99,10 +99,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand( OperationListenerBus.getInstance().fireEvent(alterTableDropColumnPreEvent, operationContext) // read the latest schema file - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) val tableInfo: org.apache.carbondata.format.TableInfo = - metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + metastore.getThriftTableInfo(carbonTable)(sparkSession) // maintain the deleted columns for schema evolution history var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]() val columnSchemaList = tableInfo.fact_table.table_columns.asScala http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index c8f64e1..a55dbdd 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -32,7 +32,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.SchemaEvolutionEntry import org.apache.carbondata.spark.exception.{ConcurrentOperationException, MalformedCarbonCommandException} @@ -95,8 +95,7 @@ private[sql] case class CarbonAlterTableRenameCommand( val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier) // get the latest carbon table and check for column existence - val oldTablePath = CarbonStorePath.getCarbonTablePath(oldTableIdentifier) - val tableMetadataFile = oldTablePath.getPath + val tableMetadataFile = oldTableIdentifier.getTablePath val operationContext = new OperationContext // TODO: Pass new Table Path in pre-event. val alterTableRenamePreEvent: AlterTableRenamePreEvent = AlterTableRenamePreEvent( @@ -106,7 +105,7 @@ private[sql] case class CarbonAlterTableRenameCommand( sparkSession) OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, operationContext) val tableInfo: org.apache.carbondata.format.TableInfo = - metastore.getThriftTableInfo(oldTablePath)(sparkSession) + metastore.getThriftTableInfo(carbonTable)(sparkSession) val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis) schemaEvolutionEntry.setTableName(newTableName) timeStamp = System.currentTimeMillis() @@ -115,7 +114,8 @@ private[sql] case class CarbonAlterTableRenameCommand( val fileType = FileFactory.getFileType(tableMetadataFile) val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName, newTableName, carbonTable.getCarbonTableIdentifier.getTableId) - var newTablePath = CarbonUtil.getNewTablePath(oldTablePath, newTableIdentifier.getTableName) + var newTablePath = CarbonTablePath.getNewTablePath( + oldTableIdentifier.getTablePath, newTableIdentifier.getTableName) metastore.removeTableFromMetadata(oldDatabaseName, oldTableName) val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] .getClient() @@ -130,9 +130,9 @@ private[sql] case class CarbonAlterTableRenameCommand( // changed the rename order to deal with situation when carbon table and hive table // will point to the same tablePath if (FileFactory.isFileExist(tableMetadataFile, fileType)) { - val rename = FileFactory.getCarbonFile(oldTablePath.getPath, fileType) - .renameForce(oldTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR + - newTableName) + val rename = FileFactory.getCarbonFile(oldTableIdentifier.getTablePath, fileType) + .renameForce( + CarbonTablePath.getNewTablePath(oldTableIdentifier.getTablePath, newTableName)) if (!rename) { renameBadRecords(newTableName, oldTableName, oldDatabaseName) sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName") @@ -162,13 +162,11 @@ private[sql] case class CarbonAlterTableRenameCommand( case e: Exception => LOGGER.error(e, "Rename table failed: " + e.getMessage) if (carbonTable != null) { - AlterTableUtil - .revertRenameTableChanges(oldTableIdentifier, - newTableName, - carbonTable.getTablePath, - carbonTable.getCarbonTableIdentifier.getTableId, - timeStamp)( - sparkSession) + AlterTableUtil.revertRenameTableChanges( + newTableName, + carbonTable, + timeStamp)( + sparkSession) renameBadRecords(newTableName, oldTableName, oldDatabaseName) } throwMetadataException(oldDatabaseName, oldTableName, http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index b44dc7e..fd09e48 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -46,7 +46,8 @@ import org.apache.carbondata.core.metadata.schema import org.apache.carbondata.core.metadata.schema.table import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath import org.apache.carbondata.core.writer.ThriftWriter import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} @@ -209,11 +210,7 @@ class CarbonFileMetastore extends CarbonMetaStore { val dbName = identifier.getCarbonTableIdentifier.getDatabaseName val tableName = identifier.getCarbonTableIdentifier.getTableName val tablePath = identifier.getTablePath - val carbonTableIdentifier = new CarbonTableIdentifier(dbName.toLowerCase(), - tableName.toLowerCase(), UUID.randomUUID().toString) - val carbonTablePath = - CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier) - val tableMetadataFile = carbonTablePath.getSchemaFilePath + val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath) val fileType = FileFactory.getFileType(tableMetadataFile) if (FileFactory.isFileExist(tableMetadataFile, fileType)) { val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName) @@ -240,13 +237,13 @@ class CarbonFileMetastore extends CarbonMetaStore { thriftTableInfo: org.apache.carbondata.format.TableInfo, schemaEvolutionEntry: SchemaEvolutionEntry, tablePath: String) (sparkSession: SparkSession): String = { - val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, oldTableIdentifier) + val identifier = AbsoluteTableIdentifier.from(tablePath, oldTableIdentifier) val schemaConverter = new ThriftWrapperSchemaConverterImpl if (schemaEvolutionEntry != null) { thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) } - val oldTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) - val newTablePath = CarbonUtil.getNewTablePath(oldTablePath, newTableIdentifier.getTableName) + val newTablePath = CarbonTablePath.getNewTablePath( + identifier.getTablePath, newTableIdentifier.getTableName) val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( thriftTableInfo, newTableIdentifier.getDatabaseName, @@ -341,8 +338,7 @@ class CarbonFileMetastore extends CarbonMetaStore { private def createSchemaThriftFile( identifier: AbsoluteTableIdentifier, thriftTableInfo: TableInfo): String = { - val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier) - val schemaFilePath = carbonTablePath.getSchemaFilePath + val schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath) val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath) val fileType = FileFactory.getFileType(schemaMetadataPath) if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) { @@ -356,7 +352,7 @@ class CarbonFileMetastore extends CarbonMetaStore { thriftWriter.write(thriftTableInfo) thriftWriter.close() updateSchemasUpdatedTime(touchSchemaFileSystemTime()) - carbonTablePath.getPath + identifier.getTablePath } protected def addTableCache( @@ -431,8 +427,7 @@ class CarbonFileMetastore extends CarbonMetaStore { (sparkSession: SparkSession) { val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName - val metadataFilePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier) - .getMetadataDirectoryPath + val metadataFilePath = CarbonTablePath.getMetadataPath(absoluteTableIdentifier.getTablePath) val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName, tableName) if (null != carbonTable) { // clear driver B-tree and dictionary cache @@ -528,9 +523,9 @@ class CarbonFileMetastore extends CarbonMetaStore { override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = metadata.carbonTables - override def getThriftTableInfo(tablePath: CarbonTablePath) + override def getThriftTableInfo(carbonTable: CarbonTable) (sparkSession: SparkSession): TableInfo = { - val tableMetadataFile = tablePath.getSchemaFilePath + val tableMetadataFile = CarbonTablePath.getSchemaFilePath(carbonTable.getTablePath) CarbonUtil.readSchemaFile(tableMetadataFile) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index 16ef38d..5e242b7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -28,7 +28,7 @@ import org.apache.carbondata.core.metadata.{schema, AbsoluteTableIdentifier, Car import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.format import org.apache.carbondata.format.SchemaEvolutionEntry import org.apache.carbondata.spark.util.CarbonSparkUtil @@ -96,12 +96,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { Seq() } - override def getThriftTableInfo(tablePath: CarbonTablePath) + override def getThriftTableInfo(carbonTable: CarbonTable) (sparkSession: SparkSession): format.TableInfo = { - val identifier = tablePath.getCarbonTableIdentifier - val relation = lookupRelation(TableIdentifier(identifier.getTableName, - Some(identifier.getDatabaseName)))(sparkSession).asInstanceOf[CarbonRelation] - val carbonTable = relation.metaData.carbonTable val schemaConverter = new ThriftWrapperSchemaConverterImpl schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo, carbonTable.getDatabaseName, @@ -148,7 +144,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { thriftTableInfo: org.apache.carbondata.format.TableInfo, carbonTablePath: String)(sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl - updateHiveMetaStoreForAlter(newTableIdentifier, + updateHiveMetaStoreForAlter( + newTableIdentifier, oldTableIdentifier, thriftTableInfo, carbonTablePath, @@ -163,7 +160,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { sparkSession: SparkSession, schemaConverter: ThriftWrapperSchemaConverterImpl) = { val newTablePath = - CarbonUtil.getNewTablePath(new Path(oldTablePath), newTableIdentifier.getTableName) + CarbonTablePath.getNewTablePath(oldTablePath, newTableIdentifier.getTableName) val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( thriftTableInfo, newTableIdentifier.getDatabaseName, http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala index 93c7c09..0645040 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala @@ -143,7 +143,7 @@ trait CarbonMetaStore { def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] - def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo + def getThriftTableInfo(carbonTable: CarbonTable)(sparkSession: SparkSession): TableInfo def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable] http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala index 7bf8536..c9833d0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala @@ -37,7 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension} import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath /** * Represents logical plan for one carbon table @@ -212,10 +212,7 @@ case class CarbonRelation( .getValidAndInvalidSegments.getValidSegments.isEmpty) { sizeInBytesLocalValue = 0L } else { - val carbonTablePath = CarbonStorePath.getCarbonTablePath( - carbonTable.getTablePath, - carbonTable.getCarbonTableIdentifier) - val tablePath = carbonTablePath.getPath + val tablePath = carbonTable.getTablePath val fileType = FileFactory.getFileType(tablePath) if (FileFactory.isFileExist(tablePath, fileType)) { // get the valid segments http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 8ebd5a9..bc36e9c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -36,7 +36,8 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTable import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -176,41 +177,28 @@ object AlterTableUtil { /** * This method reverts the changes to the schema if the rename table command fails. - * - * @param oldTableIdentifier - * @param newTableName - * @param timeStamp - * @param sparkSession */ - def revertRenameTableChanges(oldTableIdentifier: TableIdentifier, + def revertRenameTableChanges( newTableName: String, - tablePath: String, - tableId: String, + oldCarbonTable: CarbonTable, timeStamp: Long) (sparkSession: SparkSession): Unit = { - val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) - val oldCarbonTableIdentifier = new CarbonTableIdentifier(database, - oldTableIdentifier.table, tableId) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, oldCarbonTableIdentifier) + val tablePath = oldCarbonTable.getTablePath + val tableId = oldCarbonTable.getCarbonTableIdentifier.getTableId + val oldCarbonTableIdentifier = oldCarbonTable.getCarbonTableIdentifier + val database = oldCarbonTable.getDatabaseName val newCarbonTableIdentifier = new CarbonTableIdentifier(database, newTableName, tableId) - val newTablePath = CarbonUtil.getNewTablePath(new Path(tablePath), newTableName) + val newTablePath = CarbonTablePath.getNewTablePath(tablePath, newTableName) val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore val fileType = FileFactory.getFileType(tablePath) if (FileFactory.isFileExist(tablePath, fileType)) { - val tableInfo = if (metastore.isReadFromHiveMetaStore) { - // In case of hive metastore we first update the carbonschema inside old table only. - metastore.getThriftTableInfo(CarbonStorePath.getCarbonTablePath(tablePath, - new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId)))(sparkSession) - } else { - metastore.getThriftTableInfo(carbonTablePath)(sparkSession) - } + val tableInfo = metastore.getThriftTableInfo(oldCarbonTable)(sparkSession) val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime == timeStamp) { - LOGGER.error(s"Reverting changes for $database.${ oldTableIdentifier.table }") - FileFactory.getCarbonFile(carbonTablePath.getPath, fileType) - .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR + - oldTableIdentifier.table) + LOGGER.error(s"Reverting changes for $database.${oldCarbonTable.getTableName}") + FileFactory.getCarbonFile(tablePath, fileType) + .renameForce(CarbonTablePath.getNewTablePath(tablePath, oldCarbonTable.getTableName)) val absoluteTableIdentifier = AbsoluteTableIdentifier.from( newTablePath, newCarbonTableIdentifier) @@ -233,9 +221,7 @@ object AlterTableUtil { (sparkSession: SparkSession): Unit = { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath, - carbonTable.getCarbonTableIdentifier) - val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime == timeStamp) { @@ -260,9 +246,7 @@ object AlterTableUtil { (sparkSession: SparkSession): Unit = { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath, - carbonTable.getCarbonTableIdentifier) - val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime == timeStamp) { @@ -293,9 +277,7 @@ object AlterTableUtil { (sparkSession: SparkSession): Unit = { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath, - carbonTable.getCarbonTableIdentifier) - val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime == timeStamp) { @@ -344,9 +326,7 @@ object AlterTableUtil { carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) // get the latest carbon table // read the latest schema file - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath, - carbonTable.getCarbonTableIdentifier) - val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( thriftTableInfo, http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala index aadee81..0bdef8a 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala @@ -856,9 +856,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { } def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = { - val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, - carbonTable.getTablePath) - val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId) + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { override def accept(file: CarbonFile): Boolean = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala index e3678cd..1d41ddc 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala @@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.util.TableOptionConstant @@ -65,9 +65,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll { carbonLoadModel.setCsvHeaderColumns( CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration)) // Create table and metadata folders if not exist - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(table.getTablePath, table.getCarbonTableIdentifier) - val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath + val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath) val fileType = FileFactory.getFileType(metadataDirectoryPath) if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { FileFactory.mkdirs(metadataDirectoryPath, fileType) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala index f9519f8..a465251 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.test.TestQueryExecutor import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.processing.util.CarbonLoaderUtil /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala index e543893..7ca0b56 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala @@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.util.TableOptionConstant @@ -179,9 +179,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration)) carbonLoadModel.setMaxColumns("100") // Create table and metadata folders if not exist - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(table.getTablePath, table.getCarbonTableIdentifier) - val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath + val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath) val fileType = FileFactory.getFileType(metadataDirectoryPath) if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { FileFactory.mkdirs(metadataDirectoryPath, fileType) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 4ae3737..dfffc8e 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -33,10 +33,14 @@ import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus} import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, ProcessMetaDataException} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.spark.exception.ProcessMetaDataException class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { @@ -197,7 +201,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { val identifier = new TableIdentifier("batch_table", Option("streaming")) val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) .asInstanceOf[CarbonRelation].metaData.carbonTable - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) var server: ServerSocket = null try { server = getServerSocket @@ -205,7 +208,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { thread1.start() // use thread pool to catch the exception of sink thread val pool = Executors.newSingleThreadExecutor() - val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier) + val thread2 = createSocketStreamingThread(spark, server.getLocalPort, carbonTable, identifier) val future = pool.submit(thread2) Thread.sleep(1000) thread1.interrupt() @@ -225,11 +228,10 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { val identifier = new TableIdentifier("stream_table_file", Option("streaming")) val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) .asInstanceOf[CarbonRelation].metaData.carbonTable - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) val csvDataDir = new File("target/csvdata").getCanonicalPath // streaming ingest 10 rows generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir) - val thread = createFileStreamingThread(spark, tablePath, csvDataDir, intervalSecond = 1, + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, identifier) thread.start() Thread.sleep(2000) @@ -1086,12 +1088,11 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { val identifier = new TableIdentifier("stream_table_drop", Option("streaming")) val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) .asInstanceOf[CarbonRelation].metaData.carbonTable - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) var server: ServerSocket = null try { server = getServerSocket val thread1 = createWriteSocketThread(server, 2, 10, 3) - val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier, "force", 5, 1024L * 200, false) + val thread2 = createSocketStreamingThread(spark, server.getLocalPort, carbonTable, identifier, "force", 5, 1024L * 200, false) thread1.start() thread2.start() Thread.sleep(1000) @@ -1195,7 +1196,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { def createSocketStreamingThread( spark: SparkSession, port: Int, - tablePath: CarbonTablePath, + carbonTable: CarbonTable, tableIdentifier: TableIdentifier, badRecordAction: String = "force", intervalSecond: Int = 2, @@ -1216,7 +1217,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { qry = readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime(s"$intervalSecond seconds")) - .option("checkpointLocation", tablePath.getStreamingCheckpointDir) + .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)) .option("bad_records_action", badRecordAction) .option("dbName", tableIdentifier.database.get) .option("tableName", tableIdentifier.table) @@ -1255,7 +1256,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { val identifier = new TableIdentifier(tableName, Option("streaming")) val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) .asInstanceOf[CarbonRelation].metaData.carbonTable - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) var server: ServerSocket = null try { server = getServerSocket() @@ -1268,7 +1268,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { val thread2 = createSocketStreamingThread( spark = spark, port = server.getLocalPort, - tablePath = tablePath, + carbonTable = carbonTable, tableIdentifier = identifier, badRecordAction = badRecordAction, intervalSecond = intervalOfIngest, @@ -1316,7 +1316,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { def createFileStreamingThread( spark: SparkSession, - tablePath: CarbonTablePath, + carbonTable: CarbonTable, csvDataDir: String, intervalSecond: Int, tableIdentifier: TableIdentifier): Thread = { @@ -1330,7 +1330,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { qry = readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime(s"${ intervalSecond } seconds")) - .option("checkpointLocation", tablePath.getStreamingCheckpointDir) + .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)) .option("dbName", tableIdentifier.database.get) .option("tableName", tableIdentifier.table) .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala index 4d5f88c..27ed1bd 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala @@ -94,7 +94,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll { } val carbonTable = CarbonMetadata.getInstance.getCarbonTable("default", "reverttest") - assert(new File(carbonTable.getMetaDataFilepath).listFiles().length < 6) + assert(new File(carbonTable.getMetadataPath).listFiles().length < 6) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java index a8db6c9..bbc3697 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java @@ -34,7 +34,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; @@ -56,43 +55,39 @@ public class TableProcessingOperations { */ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable, final boolean isCompactionFlow) throws IOException { - String metaDataLocation = carbonTable.getMetaDataFilepath(); + String metaDataLocation = carbonTable.getMetadataPath(); final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier()); //delete folder which metadata no exist in tablestatus - for (int i = 0; i < carbonTable.getPartitionCount(); i++) { - String partitionPath = carbonTablePath.getPartitionDir(); - FileFactory.FileType fileType = FileFactory.getFileType(partitionPath); - if (FileFactory.isFileExist(partitionPath, fileType)) { - CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType); - CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile path) { - String segmentId = - CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy"); - boolean found = false; - for (int j = 0; j < details.length; j++) { - if (details[j].getLoadName().equals(segmentId)) { - found = true; - break; - } - } - return !found; - } - }); - for (int k = 0; k < listFiles.length; k++) { + String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath()); + FileFactory.FileType fileType = FileFactory.getFileType(partitionPath); + if (FileFactory.isFileExist(partitionPath, fileType)) { + CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType); + CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile path) { String segmentId = - CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy"); - if (isCompactionFlow) { - if (segmentId.contains(".")) { - CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); - } - } else { - if (!segmentId.contains(".")) { - CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); + CarbonTablePath.DataFileUtil.getSegmentId(path.getAbsolutePath() + "/dummy"); + boolean found = false; + for (int j = 0; j < details.length; j++) { + if (details[j].getLoadName().equals(segmentId)) { + found = true; + break; } } + return !found; + } + }); + for (int k = 0; k < listFiles.length; k++) { + String segmentId = + CarbonTablePath.DataFileUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy"); + if (isCompactionFlow) { + if (segmentId.contains(".")) { + CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); + } + } else { + if (!segmentId.contains(".")) { + CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); + } } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java index 4cd5014..193d192 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java @@ -34,8 +34,6 @@ import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.datatypes.ArrayDataType; import org.apache.carbondata.processing.datatypes.GenericDataType; @@ -105,12 +103,11 @@ public class FieldEncoderFactory { ColumnIdentifier parentColumnIdentifier = new ColumnIdentifier(parentColumnTableRelation.getColumnId(), null, dataField.getColumn().getDataType()); - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); AbsoluteTableIdentifier parentAbsoluteTableIdentifier = AbsoluteTableIdentifier.from( - CarbonUtil.getNewTablePath(carbonTablePath, parentTableIdentifier.getTableName()), - parentTableIdentifier); + CarbonTablePath.getNewTablePath( + absoluteTableIdentifier.getTablePath(), parentTableIdentifier.getTableName()), + parentTableIdentifier); identifier = new DictionaryColumnUniqueIdentifier(parentAbsoluteTableIdentifier, parentColumnIdentifier, dataField.getColumn().getDataType()); return new DictionaryFieldConverterImpl(dataField, cache, parentAbsoluteTableIdentifier, http://git-wip-us.apache.org/repos/asf/carbondata/blob/9daad358/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java index d3caa99..a08177a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java @@ -19,10 +19,8 @@ package org.apache.carbondata.processing.merger; import java.util.List; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.store.CarbonDataFileAttributes; import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; @@ -42,13 +40,11 @@ public abstract class AbstractResultProcessor { public abstract boolean execute(List<RawResultIterator> resultIteratorList); protected void setDataFileAttributesInModel(CarbonLoadModel loadModel, - CompactionType compactionType, CarbonTable carbonTable, - CarbonFactDataHandlerModel carbonFactDataHandlerModel) { + CompactionType compactionType, CarbonFactDataHandlerModel carbonFactDataHandlerModel) { CarbonDataFileAttributes carbonDataFileAttributes; if (compactionType == CompactionType.IUD_UPDDEL_DELTA) { long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(), - CarbonStorePath.getCarbonTablePath(loadModel.getTablePath(), - carbonTable.getCarbonTableIdentifier())); + loadModel.getTablePath()); // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will // be written in same segment. So the TaskNo should be incremented by 1 from max val. long index = taskNo + 1;