Repository: carbondata Updated Branches: refs/heads/master e2c517e3f -> 10b393808
[CARBONDATA-3042] Column Schema objects are present in Driver and Executor even after dropping table Problem: Column Schema objects are present in Driver and Executor even after dropping table. Solution: In Driver: After dropping table, remove entry of tableInfo from CarbonMetaDataInstance. In Executor: Remove usage of CarbonMetaDataInstance object and instead pass CarbonTable Object itself This closes #2852 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/10b39380 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/10b39380 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/10b39380 Branch: refs/heads/master Commit: 10b393808e91344b017ba3e946b28217c2dd9757 Parents: e2c517e Author: Indhumathi27 <indhumathi...@gmail.com> Authored: Thu Oct 25 13:37:08 2018 +0530 Committer: manishgupta88 <tomanishgupt...@gmail.com> Committed: Tue Oct 30 14:19:39 2018 +0530 ---------------------------------------------------------------------- .../core/metadata/CarbonMetadata.java | 5 +- .../statusmanager/SegmentStatusManager.java | 6 +- .../carbondata/core/util/DeleteLoadFolders.java | 31 +++++------ .../spark/rdd/AlterTableLoadPartitionRDD.scala | 4 +- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 3 +- .../carbondata/spark/rdd/StreamHandoffRDD.scala | 1 - .../spark/sql/CarbonDictionaryDecoder.scala | 5 -- .../spark/sql/hive/CarbonFileMetastore.scala | 3 +- .../spark/sql/hive/CarbonHiveMetaStore.scala | 2 +- .../loading/DataLoadProcessBuilder.java | 2 - .../sort/impl/ParallelReadMergeSorterImpl.java | 9 ++- ...allelReadMergeSorterWithColumnRangeImpl.java | 8 +-- .../UnsafeBatchParallelReadMergeSorterImpl.java | 6 +- ...allelReadMergeSorterWithColumnRangeImpl.java | 11 ++-- .../CarbonRowDataWriterProcessorStepImpl.java | 13 ++--- .../steps/DataConverterProcessorStepImpl.java | 6 +- .../steps/DataWriterBatchProcessorStepImpl.java | 11 ++-- .../steps/DataWriterProcessorStepImpl.java | 18 +++--- .../merger/CompactionResultSortProcessor.java | 4 +- .../merger/RowResultMergerProcessor.java | 5 +- .../partition/spliter/RowResultProcessor.java | 5 +- .../sort/sortdata/SortParameters.java | 44 +++++++++------ .../store/CarbonFactDataHandlerModel.java | 4 +- .../util/CarbonDataProcessorUtil.java | 58 +++++--------------- 24 files changed, 110 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java index 850f477..e44092e 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java @@ -69,9 +69,8 @@ public final class CarbonMetadata { /** * Below method will be used to set the carbon table - * This method will be used in executor side as driver will always have - * updated table so from driver during query execution and data loading - * we just need to add the table + * Note: Use this method only in driver as clean up in Executor is not handled + * if this table is added to executor * * @param carbonTable */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index 9196367..fbb765b 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -1001,9 +1001,9 @@ public class SegmentStatusManager { CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); } if (updationCompletionStatus) { - DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion( - identifier, carbonTable.getMetadataPath(), - newAddedLoadHistoryList, isForceDeletion, partitionSpecs); + DeleteLoadFolders + .physicalFactAndMeasureMetadataDeletion(carbonTable, newAddedLoadHistoryList, + isForceDeletion, partitionSpecs); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java index f1cc57f..b614f55 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java @@ -66,22 +66,19 @@ public final class DeleteLoadFolders { return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId); } - public static void physicalFactAndMeasureMetadataDeletion( - AbsoluteTableIdentifier absoluteTableIdentifier, - String metadataPath, + public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable, LoadMetadataDetails[] newAddedLoadHistoryList, boolean isForceDelete, List<PartitionSpec> specs) { - LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath); - physicalFactAndMeasureMetadataDeletion( - absoluteTableIdentifier, + LoadMetadataDetails[] currentDetails = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); + physicalFactAndMeasureMetadataDeletion(carbonTable, currentDetails, isForceDelete, specs, currentDetails); if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) { - physicalFactAndMeasureMetadataDeletion( - absoluteTableIdentifier, + physicalFactAndMeasureMetadataDeletion(carbonTable, newAddedLoadHistoryList, isForceDelete, specs, @@ -91,17 +88,15 @@ public final class DeleteLoadFolders { /** * Delete the invalid data physically from table. - * @param absoluteTableIdentifier table identifier + * @param carbonTable table * @param loadDetails Load details which need clean up * @param isForceDelete is Force delete requested by user * @param specs Partition specs * @param currLoadDetails Current table status load details which are required for update manager. */ - private static void physicalFactAndMeasureMetadataDeletion( - AbsoluteTableIdentifier absoluteTableIdentifier, LoadMetadataDetails[] loadDetails, - boolean isForceDelete, List<PartitionSpec> specs, LoadMetadataDetails[] currLoadDetails) { - CarbonTable carbonTable = DataMapStoreManager.getInstance().getCarbonTable( - absoluteTableIdentifier); + private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable, + LoadMetadataDetails[] loadDetails, boolean isForceDelete, List<PartitionSpec> specs, + LoadMetadataDetails[] currLoadDetails) { List<TableDataMap> indexDataMaps = new ArrayList<>(); try { for (TableDataMap dataMap : DataMapStoreManager.getInstance().getAllDataMap(carbonTable)) { @@ -112,7 +107,8 @@ public final class DeleteLoadFolders { } catch (IOException e) { LOGGER.warn(String.format( "Failed to get datamaps for %s.%s, therefore the datamap files could not be cleaned.", - absoluteTableIdentifier.getDatabaseName(), absoluteTableIdentifier.getTableName())); + carbonTable.getAbsoluteTableIdentifier().getDatabaseName(), + carbonTable.getAbsoluteTableIdentifier().getTableName())); } SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(carbonTable, currLoadDetails); @@ -120,12 +116,11 @@ public final class DeleteLoadFolders { if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) { try { if (oneLoad.getSegmentFile() != null) { - SegmentFileStore.deleteSegment( - absoluteTableIdentifier.getTablePath(), + SegmentFileStore.deleteSegment(carbonTable.getAbsoluteTableIdentifier().getTablePath(), new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile()), specs, updateStatusManager); } else { - String path = getSegmentPath(absoluteTableIdentifier, oneLoad); + String path = getSegmentPath(carbonTable.getAbsoluteTableIdentifier(), oneLoad); boolean status = false; if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) { CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala index a03447d..4322359 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala @@ -62,13 +62,11 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel, val partitionId: Int = partitionInfo.getPartitionId(split.index) carbonLoadModel.setTaskNo(String.valueOf(partitionId)) carbonLoadModel.setSegmentId(segmentId) - CarbonMetadata.getInstance().addCarbonTable( - carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable) CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, isCompactionFlow = false, isAltPartitionFlow = true) val tempStoreLoc: Array[String] = CarbonDataProcessorUtil.getLocalDataFolderLocation( - databaseName, factTableName, carbonLoadModel.getTaskNo, segmentId, false, true) + carbonTable, carbonLoadModel.getTaskNo, segmentId, false, true) val loadStatus: Boolean = if (rows.isEmpty) { LOGGER.info("After repartition this split, NO target rows to write back.") http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 07f6964..1fbcc51 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -86,7 +86,6 @@ class CarbonMergerRDD[K, V]( val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val iter = new Iterator[(K, V)] { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - CarbonMetadata.getInstance().addCarbonTable(carbonTable) val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition] if (carbonTable.isPartitionTable) { carbonLoadModel.setTaskNo(String.valueOf(carbonSparkPartition.partitionId)) @@ -198,7 +197,7 @@ class CarbonMergerRDD[K, V]( } val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation( - databaseName, factTableName, carbonLoadModel.getTaskNo, mergeNumber, true, false) + carbonTable, carbonLoadModel.getTaskNo, mergeNumber, true, false) if (restructuredBlockExists) { LOGGER.info("CompactionResultSortProcessor flow is selected") http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala index 3f0eb71..13172c7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala @@ -117,7 +117,6 @@ class StreamHandoffRDD[K, V]( carbonLoadModel.setTaskNo("" + split.index) val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl) - CarbonMetadata.getInstance().addCarbonTable(carbonTable) // the input iterator is using raw row val iteratorList = prepareInputIterator(split, carbonTable) http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index ff7ac60..f3d5bf0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -279,11 +279,6 @@ case class CarbonDictionaryDecoder( if (null != carbonDimension.getColumnSchema.getParentColumnTableRelations && !carbonDimension .getColumnSchema.getParentColumnTableRelations.isEmpty) { - val parentRelationIdentifier = carbonDimension.getColumnSchema - .getParentColumnTableRelations.get(0).getRelationIdentifier - val parentTablePath = CarbonMetadata.getInstance() - .getCarbonTable(parentRelationIdentifier.getDatabaseName, - parentRelationIdentifier.getTableName).getTablePath (QueryUtil .getTableIdentifierForColumn(carbonDimension), new ColumnIdentifier(carbonDimension.getColumnSchema http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 982bbee..96b31c2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -491,7 +491,6 @@ class CarbonFileMetastore extends CarbonMetaStore { // in the other beeline need to update. checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) - removeTableFromMetadata(dbName, tableName) CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) updateSchemasUpdatedTime(touchSchemaFileSystemTime()) // discard cached table info in cachedDataSourceTables @@ -499,6 +498,7 @@ class CarbonFileMetastore extends CarbonMetaStore { sparkSession.sessionState.catalog.refreshTable(tableIdentifier) DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier) + removeTableFromMetadata(dbName, tableName) } else { if (!isTransactionalCarbonTable(absoluteTableIdentifier)) { removeTableFromMetadata(dbName, tableName) @@ -508,6 +508,7 @@ class CarbonFileMetastore extends CarbonMetaStore { sparkSession.sessionState.catalog.refreshTable(tableIdentifier) DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier) + removeTableFromMetadata(dbName, tableName) } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index 5ebe242..c8c7d31 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -79,13 +79,13 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable) } checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) - removeTableFromMetadata(dbName, tableName) CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) // discard cached table info in cachedDataSourceTables val tableIdentifier = TableIdentifier(tableName, Option(dbName)) sparkSession.sessionState.catalog.refreshTable(tableIdentifier) DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier) + removeTableFromMetadata(dbName, tableName) } override def checkSchemasModifiedTimeAndReloadTable(tableIdentifier: TableIdentifier): Boolean = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index f89a4e7..bcca915 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -27,7 +27,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -249,7 +248,6 @@ public final class DataLoadProcessBuilder { configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, loadModel.getBadRecordsLocation()); - CarbonMetadata.getInstance().addCarbonTable(carbonTable); List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(carbonTable.getTableName()); List<CarbonMeasure> measures = http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java index f0920ee..55b336e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java @@ -71,11 +71,10 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter { public void initialize(SortParameters sortParameters) { this.sortParameters = sortParameters; intermediateFileMerger = new SortIntermediateFileMerger(sortParameters); - String[] storeLocations = - CarbonDataProcessorUtil.getLocalDataFolderLocation( - sortParameters.getDatabaseName(), sortParameters.getTableName(), - String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(), - false, false); + String[] storeLocations = CarbonDataProcessorUtil + .getLocalDataFolderLocation(sortParameters.getCarbonTable(), + String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(), false, + false); // Set the data file location String[] dataFolderLocations = CarbonDataProcessorUtil.arrayAppend(storeLocations, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java index 3b767aa..8b86c0c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java @@ -148,7 +148,7 @@ public class ParallelReadMergeSorterWithColumnRangeImpl extends AbstractMergeSor private SingleThreadFinalSortFilesMerger getFinalMerger(SortParameters sortParameters) { String[] storeLocation = CarbonDataProcessorUtil - .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(), + .getLocalDataFolderLocation(sortParameters.getCarbonTable(), String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId() + "", false, false); // Set the data file location @@ -188,9 +188,9 @@ public class ParallelReadMergeSorterWithColumnRangeImpl extends AbstractMergeSor private void setTempLocation(SortParameters parameters) { String[] carbonDataDirectoryPath = CarbonDataProcessorUtil - .getLocalDataFolderLocation(parameters.getDatabaseName(), - parameters.getTableName(), parameters.getTaskNo(), - parameters.getSegmentId(), false, false); + .getLocalDataFolderLocation(parameters.getCarbonTable(), parameters.getTaskNo(), + parameters.getSegmentId(), + false, false); String[] tmpLocs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); parameters.setTempFileLocation(tmpLocs); http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java index 9cb67df..aa960b6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java @@ -232,9 +232,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter } private void setTempLocation(SortParameters parameters) { - String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation( - parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(), - parameters.getSegmentId(), false, false); + String[] carbonDataDirectoryPath = CarbonDataProcessorUtil + .getLocalDataFolderLocation(parameters.getCarbonTable(), parameters.getTaskNo(), + parameters.getSegmentId(), false, false); String[] tempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); parameters.setTempFileLocation(tempDirs); http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java index a8ec05c..f9631a5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java @@ -143,9 +143,9 @@ public class UnsafeParallelReadMergeSorterWithColumnRangeImpl extends AbstractMe private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(SortParameters sortParameters) { String[] storeLocation = CarbonDataProcessorUtil - .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(), - String.valueOf(sortParameters.getTaskNo()), - sortParameters.getSegmentId() + "", false, false); + .getLocalDataFolderLocation(sortParameters.getCarbonTable(), + String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId() + "", false, + false); // Set the data file location String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); @@ -182,9 +182,8 @@ public class UnsafeParallelReadMergeSorterWithColumnRangeImpl extends AbstractMe private void setTempLocation(SortParameters parameters) { String[] carbonDataDirectoryPath = CarbonDataProcessorUtil - .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(), - parameters.getTaskNo(), parameters.getSegmentId(), - false, false); + .getLocalDataFolderLocation(parameters.getCarbonTable(), parameters.getTaskNo(), + parameters.getSegmentId(), false, false); String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); LOGGER.warn("set temp location: " + StringUtils.join(tmpLoc, ", ")); http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java index ae42df7..ce79f24 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java @@ -100,13 +100,10 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces child.initialize(); } - private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) { - String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation( - tableIdentifier.getDatabaseName(), - tableIdentifier.getTableName(), - String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(), - false, - false); + private String[] getStoreLocation() { + String[] storeLocation = CarbonDataProcessorUtil + .getLocalDataFolderLocation(this.configuration.getTableSpec().getCarbonTable(), + String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(), false, false); CarbonDataProcessorUtil.createLocations(storeLocation); return storeLocation; } @@ -161,7 +158,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces } private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) throws IOException { - String[] storeLocation = getStoreLocation(tableIdentifier); + String[] storeLocation = getStoreLocation(); DataMapWriterListener listener = getDataMapWriterListener(0); CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel( configuration, storeLocation, 0, iteratorIndex, listener); http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java index ae9ec3d..0195877 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java @@ -136,15 +136,13 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte Arrays.sort(convertedSortColumnRanges, new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(), sortColumnRangeInfo.getIsSortColumnNoDict(), CarbonDataProcessorUtil - .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(), - configuration.getTableIdentifier().getTableName()))); + .getNoDictDataTypes(configuration.getTableSpec().getCarbonTable()))); // range partitioner to dispatch rows by sort columns this.partitioner = new RangePartitionerImpl(convertedSortColumnRanges, new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(), sortColumnRangeInfo.getIsSortColumnNoDict(), CarbonDataProcessorUtil - .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(), - configuration.getTableIdentifier().getTableName()))); + .getNoDictDataTypes(configuration.getTableSpec().getCarbonTable()))); } // only convert sort column fields http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java index 694b345..7cb102b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java @@ -67,11 +67,10 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS child.initialize(); } - private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) { - return CarbonDataProcessorUtil.getLocalDataFolderLocation( - tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), - String.valueOf(configuration.getTaskNo()), - configuration.getSegmentId(), false, false); + private String[] getStoreLocation() { + return CarbonDataProcessorUtil + .getLocalDataFolderLocation(configuration.getTableSpec().getCarbonTable(), + String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(), false, false); } @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException { @@ -84,7 +83,7 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID, System.currentTimeMillis()); int i = 0; - String[] storeLocation = getStoreLocation(tableIdentifier); + String[] storeLocation = getStoreLocation(); CarbonDataProcessorUtil.createLocations(storeLocation); for (Iterator<CarbonRowBatch> iterator : iterators) { int k = 0; http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java index 3d704c9..1595e1b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java @@ -89,19 +89,16 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { this.carbonFactHandlers = new CopyOnWriteArrayList<>(); } - private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) { + private String[] getStoreLocation() { String[] storeLocation = CarbonDataProcessorUtil - .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), - tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), - configuration.getSegmentId(), false, false); + .getLocalDataFolderLocation(configuration.getTableSpec().getCarbonTable(), + String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(), false, false); CarbonDataProcessorUtil.createLocations(storeLocation); return storeLocation; } public CarbonFactDataHandlerModel getDataHandlerModel() { - CarbonTableIdentifier tableIdentifier = - configuration.getTableIdentifier().getCarbonTableIdentifier(); - String[] storeLocation = getStoreLocation(tableIdentifier); + String[] storeLocation = getStoreLocation(); listener = getDataMapWriterListener(0); CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel .createCarbonFactDataHandlerModel(configuration, storeLocation, 0, 0, listener); @@ -170,14 +167,13 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { @Override public Void call() throws Exception { LOGGER.info("Process writer forward for table " + tableIdentifier.getTableName() + ", range: " + rangeId); - processRange(insideRangeIterator, tableIdentifier, rangeId); + processRange(insideRangeIterator, rangeId); return null; } } - private void processRange(Iterator<CarbonRowBatch> insideRangeIterator, - CarbonTableIdentifier tableIdentifier, int rangeId) { - String[] storeLocation = getStoreLocation(tableIdentifier); + private void processRange(Iterator<CarbonRowBatch> insideRangeIterator, int rangeId) { + String[] storeLocation = getStoreLocation(); listener = getDataMapWriterListener(rangeId); CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index 8d28d45..7c7b8ee 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -463,7 +463,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { */ private void initializeFinalThreadMergerForMergeSort() { boolean[] noDictionarySortColumnMapping = CarbonDataProcessorUtil - .getNoDictSortColMapping(carbonTable.getDatabaseName(), carbonTable.getTableName()); + .getNoDictSortColMapping(carbonTable); sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping); String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation, CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); @@ -482,7 +482,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { + carbonLoadModel.getFactTimeStamp() + ".tmp"; } else { carbonStoreLocation = CarbonDataProcessorUtil - .createCarbonStoreLocation(carbonLoadModel.getDatabaseName(), tableName, + .createCarbonStoreLocation(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(), carbonLoadModel.getSegmentId()); } CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java index dcb7cb4..6475ba8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java @@ -81,8 +81,9 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + loadModel .getFactTimeStamp() + ".tmp"; } else { - carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation( - loadModel.getDatabaseName(), tableName, loadModel.getSegmentId()); + carbonStoreLocation = CarbonDataProcessorUtil + .createCarbonStoreLocation(loadModel.getCarbonDataLoadSchema().getCarbonTable(), + loadModel.getSegmentId()); } CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName, http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java index ac70f27..977b9d3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java @@ -50,8 +50,9 @@ public class RowResultProcessor { CarbonDataProcessorUtil.createLocations(tempStoreLocation); this.segmentProperties = segProp; String tableName = carbonTable.getTableName(); - String carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation( - loadModel.getDatabaseName(), tableName, loadModel.getSegmentId()); + String carbonStoreLocation = CarbonDataProcessorUtil + .createCarbonStoreLocation(loadModel.getCarbonDataLoadSchema().getCarbonTable(), + loadModel.getSegmentId()); CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName, tempStoreLocation, carbonStoreLocation); http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java index ecce232..7908f4f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java @@ -139,6 +139,11 @@ public class SortParameters implements Serializable { private int batchSortSizeinMb; private int rangeId = 0; + /** + * CarbonTable Info + */ + private CarbonTable carbonTable; + public SortParameters getCopy() { SortParameters parameters = new SortParameters(); parameters.tempFileLocation = tempFileLocation; @@ -172,6 +177,7 @@ public class SortParameters implements Serializable { parameters.numberOfCores = numberOfCores; parameters.batchSortSizeinMb = batchSortSizeinMb; parameters.rangeId = rangeId; + parameters.carbonTable = carbonTable; return parameters; } @@ -375,11 +381,21 @@ public class SortParameters implements Serializable { this.batchSortSizeinMb = batchSortSizeinMb; } + public void setCarbonTable(CarbonTable carbonTable) { + this.carbonTable = carbonTable; + } + + public CarbonTable getCarbonTable() { + return carbonTable; + } + + public static SortParameters createSortParameters(CarbonDataLoadConfiguration configuration) { SortParameters parameters = new SortParameters(); CarbonTableIdentifier tableIdentifier = configuration.getTableIdentifier().getCarbonTableIdentifier(); CarbonProperties carbonProperties = CarbonProperties.getInstance(); + parameters.setCarbonTable(configuration.getTableSpec().getCarbonTable()); parameters.setDatabaseName(tableIdentifier.getDatabaseName()); parameters.setTableName(tableIdentifier.getTableName()); parameters.setPartitionID("0"); @@ -401,8 +417,7 @@ public class SortParameters implements Serializable { parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns()); parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns()); parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil - .getNoDictSortColMapping(configuration.getTableIdentifier().getDatabaseName(), - configuration.getTableIdentifier().getTableName())); + .getNoDictSortColMapping(parameters.getCarbonTable())); parameters.setSortColumn(configuration.getSortColumnMapping()); parameters.setObserver(new SortObserver()); // get sort buffer size @@ -418,10 +433,9 @@ public class SortParameters implements Serializable { LOGGER.info("Number of intermediate file to be merged: " + parameters .getNumberOfIntermediateFileToBeMerged()); - - String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation( - tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), - configuration.getTaskNo(), configuration.getSegmentId(), false, false); + String[] carbonDataDirectoryPath = CarbonDataProcessorUtil + .getLocalDataFolderLocation(parameters.getCarbonTable(), + configuration.getTaskNo(), configuration.getSegmentId(), false, false); String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); @@ -453,11 +467,9 @@ public class SortParameters implements Serializable { DataType[] measureDataType = configuration.getMeasureDataType(); parameters.setMeasureDataType(measureDataType); parameters.setNoDictDataType(CarbonDataProcessorUtil - .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(), - configuration.getTableIdentifier().getTableName())); + .getNoDictDataTypes(configuration.getTableSpec().getCarbonTable())); Map<String, DataType[]> noDictSortAndNoSortDataTypes = CarbonDataProcessorUtil - .getNoDictSortAndNoSortDataTypes(configuration.getTableIdentifier().getDatabaseName(), - configuration.getTableIdentifier().getTableName()); + .getNoDictSortAndNoSortDataTypes(configuration.getTableSpec().getCarbonTable()); parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes")); parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes")); return parameters; @@ -477,6 +489,7 @@ public class SortParameters implements Serializable { boolean[] sortColumnMapping, boolean[] isVarcharDimensionColumn, boolean isCompactionFlow) { SortParameters parameters = new SortParameters(); CarbonProperties carbonProperties = CarbonProperties.getInstance(); + parameters.setCarbonTable(carbonTable); parameters.setDatabaseName(databaseName); parameters.setTableName(tableName); parameters.setPartitionID(CarbonTablePath.DEPRECATED_PATITION_ID); @@ -506,7 +519,7 @@ public class SortParameters implements Serializable { .getNumberOfIntermediateFileToBeMerged()); String[] carbonDataDirectoryPath = CarbonDataProcessorUtil - .getLocalDataFolderLocation(databaseName, tableName, taskNo, segmentId, + .getLocalDataFolderLocation(carbonTable, taskNo, segmentId, isCompactionFlow, false); String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); @@ -532,17 +545,16 @@ public class SortParameters implements Serializable { CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT))); DataType[] type = CarbonDataProcessorUtil - .getMeasureDataType(parameters.getMeasureColCount(), parameters.getDatabaseName(), - parameters.getTableName()); + .getMeasureDataType(parameters.getMeasureColCount(), parameters.getCarbonTable()); parameters.setMeasureDataType(type); parameters.setNoDictDataType(CarbonDataProcessorUtil - .getNoDictDataTypes(parameters.getDatabaseName(), parameters.getTableName())); + .getNoDictDataTypes(carbonTable)); Map<String, DataType[]> noDictSortAndNoSortDataTypes = CarbonDataProcessorUtil - .getNoDictSortAndNoSortDataTypes(parameters.getDatabaseName(), parameters.getTableName()); + .getNoDictSortAndNoSortDataTypes(parameters.getCarbonTable()); parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes")); parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes")); parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil - .getNoDictSortColMapping(parameters.getDatabaseName(), parameters.getTableName())); + .getNoDictSortColMapping(parameters.getCarbonTable())); return parameters; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index d086b6d..878ce6b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -30,7 +30,6 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.keygenerator.KeyGenerator; import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; @@ -205,8 +204,7 @@ public class CarbonFactDataHandlerModel { } } } - CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable( - identifier.getDatabaseName(), identifier.getTableName()); + CarbonTable carbonTable = configuration.getTableSpec().getCarbonTable(); List<ColumnSchema> wrapperColumnSchema = CarbonUtil .getColumnSchemaList(carbonTable.getDimensionByTableName(identifier.getTableName()), http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index 6f36ef8..044e4e8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -33,7 +33,6 @@ import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.constants.LoggerAction; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; @@ -138,25 +137,6 @@ public final class CarbonDataProcessorUtil { } /** - * This method will form the local data folder store location - * - * @param databaseName - * @param tableName - * @param taskId - * @param segmentId - * @param isCompactionFlow - * @param isAltPartitionFlow - * @return - */ - public static String[] getLocalDataFolderLocation(String databaseName, String tableName, - String taskId, String segmentId, boolean isCompactionFlow, - boolean isAltPartitionFlow) { - CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); - return getLocalDataFolderLocation(carbonTable, taskId, - segmentId, isCompactionFlow, isAltPartitionFlow); - } - - /** * This method will form the key for getting the temporary location set in carbon properties * * @param databaseName @@ -413,14 +393,12 @@ public final class CarbonDataProcessorUtil { return columnNames; } - public static DataType[] getMeasureDataType(int measureCount, String databaseName, - String tableName) { + public static DataType[] getMeasureDataType(int measureCount, CarbonTable carbonTable) { DataType[] type = new DataType[measureCount]; for (int i = 0; i < type.length; i++) { type[i] = DataTypes.DOUBLE; } - CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); - List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(tableName); + List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(carbonTable.getTableName()); for (int i = 0; i < type.length; i++) { type[i] = measures.get(i).getDataType(); } @@ -430,13 +408,12 @@ public final class CarbonDataProcessorUtil { /** * Get the no dictionary data types on the table * - * @param databaseName - * @param tableName + * @param carbonTable * @return */ - public static DataType[] getNoDictDataTypes(String databaseName, String tableName) { - CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); - List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName); + public static DataType[] getNoDictDataTypes(CarbonTable carbonTable) { + List<CarbonDimension> dimensions = + carbonTable.getDimensionByTableName(carbonTable.getTableName()); List<DataType> type = new ArrayList<>(); for (int i = 0; i < dimensions.size(); i++) { if (dimensions.get(i).isSortColumn() && !dimensions.get(i).hasEncoding(Encoding.DICTIONARY)) { @@ -449,13 +426,12 @@ public final class CarbonDataProcessorUtil { /** * Get the no dictionary sort column mapping of the table * - * @param databaseName - * @param tableName + * @param carbonTable * @return */ - public static boolean[] getNoDictSortColMapping(String databaseName, String tableName) { - CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); - List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName); + public static boolean[] getNoDictSortColMapping(CarbonTable carbonTable) { + List<CarbonDimension> dimensions = + carbonTable.getDimensionByTableName(carbonTable.getTableName()); List<Boolean> noDicSortColMap = new ArrayList<>(); for (int i = 0; i < dimensions.size(); i++) { if (dimensions.get(i).isSortColumn()) { @@ -477,14 +453,12 @@ public final class CarbonDataProcessorUtil { /** * Get the data types of the no dictionary sort columns * - * @param databaseName - * @param tableName + * @param carbonTable * @return */ - public static Map<String, DataType[]> getNoDictSortAndNoSortDataTypes(String databaseName, - String tableName) { - CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); - List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName); + public static Map<String, DataType[]> getNoDictSortAndNoSortDataTypes(CarbonTable carbonTable) { + List<CarbonDimension> dimensions = + carbonTable.getDimensionByTableName(carbonTable.getTableName()); List<DataType> noDictSortType = new ArrayList<>(); List<DataType> noDictNoSortType = new ArrayList<>(); for (int i = 0; i < dimensions.size(); i++) { @@ -509,9 +483,7 @@ public final class CarbonDataProcessorUtil { * * @return data directory path */ - public static String createCarbonStoreLocation(String databaseName, String tableName, - String segmentId) { - CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); + public static String createCarbonStoreLocation(CarbonTable carbonTable, String segmentId) { return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId); }