http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java index 2a69f0d..a4d3d2b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java @@ -272,7 +272,7 @@ public class CarbonCompactionUtil { public static CarbonTable getNextTableToCompact(CarbonTable[] carbonTables, List<CarbonTableIdentifier> skipList) { for (CarbonTable ctable : carbonTables) { - String metadataPath = ctable.getMetaDataFilepath(); + String metadataPath = ctable.getMetadataPath(); // check for the compaction required file and at the same time exclude the tables which are // present in the skip list. if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index c141636..89326a3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -169,15 +169,13 @@ public final class CarbonDataMergerUtil { // End Timestamp. // Table Update Status Metadata Update. - AbsoluteTableIdentifier absoluteTableIdentifier = + AbsoluteTableIdentifier identifier = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); - CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); - SegmentUpdateStatusManager segmentUpdateStatusManager = - new SegmentUpdateStatusManager(absoluteTableIdentifier); + new SegmentUpdateStatusManager(identifier); - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock(); ICarbonLock statusLock = segmentStatusManager.getTableStatusLock(); @@ -224,7 +222,7 @@ public final class CarbonDataMergerUtil { } LoadMetadataDetails[] loadDetails = - segmentStatusManager.readLoadMetadata(metaDataFilepath); + SegmentStatusManager.readLoadMetadata(metaDataFilepath); for (LoadMetadataDetails loadDetail : loadDetails) { if (loadsToMerge.contains(loadDetail)) { @@ -237,18 +235,18 @@ public final class CarbonDataMergerUtil { } } - segmentUpdateStatusManager - .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp); - segmentStatusManager - .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), loadDetails); + segmentUpdateStatusManager.writeLoadDetailsIntoFile( + Arrays.asList(updateLists), timestamp); + SegmentStatusManager.writeLoadDetailsIntoFile( + CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()), loadDetails); status = true; } else { LOGGER.error("Not able to acquire the lock."); status = false; } } catch (IOException e) { - LOGGER.error("Error while updating metadata. The metadata file path is " + carbonTablePath - .getMetadataDirectoryPath()); + LOGGER.error("Error while updating metadata. The metadata file path is " + + CarbonTablePath.getMetadataPath(identifier.getTablePath())); status = false; } finally { @@ -284,9 +282,9 @@ public final class CarbonDataMergerUtil { String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel carbonLoadModel, CompactionType compactionType, String segmentFile) throws IOException { boolean tableStatusUpdationStatus = false; - AbsoluteTableIdentifier absoluteTableIdentifier = + AbsoluteTableIdentifier identifier = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); @@ -295,10 +293,7 @@ public final class CarbonDataMergerUtil { LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "." + carbonLoadModel.getTableName() + " for table status updation "); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier); - - String statusFilePath = carbonTablePath.getTableStatusFilePath(); + String statusFilePath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath); @@ -617,8 +612,6 @@ public final class CarbonDataMergerUtil { // variable to store one segment size across partition. long sizeOfOneSegmentAcrossPartition; if (segment.getSegmentFile() != null) { - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier()); sizeOfOneSegmentAcrossPartition = CarbonUtil .getSizeOfSegment(carbonTablePath, new Segment(segId, segment.getSegmentFile())); } else { @@ -662,35 +655,17 @@ public final class CarbonDataMergerUtil { /** * For calculating the size of the specified segment * @param tablePath the store path of the segment - * @param tableIdentifier identifier of table that the segment belong to * @param segId segment id * @return the data size of the segment */ - private static long getSizeOfSegment(String tablePath, - CarbonTableIdentifier tableIdentifier, String segId) { - String loadPath = getStoreLocation(tablePath, tableIdentifier, segId); + private static long getSizeOfSegment(String tablePath, String segId) { + String loadPath = CarbonTablePath.getSegmentPath(tablePath, segId); CarbonFile segmentFolder = FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath)); return getSizeOfFactFileInLoad(segmentFolder); } /** - * This method will get the store location for the given path, segemnt id and partition id - * - * @param tablePath - * @param carbonTableIdentifier identifier of catbon table that the segment belong to - * @param segmentId segment id - * @return the store location of the segment - */ - private static String getStoreLocation(String tablePath, - CarbonTableIdentifier carbonTableIdentifier, String segmentId) { - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier); - return carbonTablePath.getCarbonDataDirectoryPath(segmentId); - } - - - /** * Identify the segments to be merged based on the segment count * * @param listOfSegmentsAfterPreserve the list of segments after @@ -1033,7 +1008,7 @@ public final class CarbonDataMergerUtil { * if UpdateDelta Files are more than IUD Compaction threshold. * * @param seg - * @param absoluteTableIdentifier + * @param identifier * @param segmentUpdateStatusManager * @param numberDeltaFilesThreshold * @return @@ -1045,9 +1020,7 @@ public final class CarbonDataMergerUtil { CarbonFile[] updateDeltaFiles = null; Set<String> uniqueBlocks = new HashSet<String>(); - CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); - - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(seg.getSegmentNo()); + String segmentPath = CarbonTablePath.getSegmentPath(absoluteTableIdentifier.getTablePath(), seg.getSegmentNo()); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); CarbonFile[] allSegmentFiles = segDir.listFiles(); @@ -1295,15 +1268,12 @@ public final class CarbonDataMergerUtil { CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, table, timestamp, true); // Update the Table Status. - String metaDataFilepath = table.getMetaDataFilepath(); - AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier(); - - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier); + String metaDataFilepath = table.getMetadataPath(); + AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier(); - String tableStatusPath = carbonTablePath.getTableStatusFilePath(); + String tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); @@ -1317,7 +1287,7 @@ public final class CarbonDataMergerUtil { + " for table status updation"); LoadMetadataDetails[] listOfLoadFolderDetailsArray = - segmentStatusManager.readLoadMetadata(metaDataFilepath); + SegmentStatusManager.readLoadMetadata(metaDataFilepath); for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) { if (loadMetadata.getLoadName().equalsIgnoreCase("0")) { @@ -1326,7 +1296,7 @@ public final class CarbonDataMergerUtil { } } try { - segmentStatusManager + SegmentStatusManager .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray); } catch (IOException e) { return false; http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 732a7e8..5062a78 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -35,7 +35,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.datamap.DataMapWriterListener; import org.apache.carbondata.processing.datatypes.GenericDataType; @@ -337,9 +336,8 @@ public class CarbonFactDataHandlerModel { return configuration.getDataWritePath(); } AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier(); - CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); - String carbonDataDirectoryPath = carbonTablePath - .getCarbonDataDirectoryPath(configuration.getSegmentId() + ""); + String carbonDataDirectoryPath = CarbonTablePath + .getSegmentPath(absoluteTableIdentifier.getTablePath(), configuration.getSegmentId() + ""); CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath); return carbonDataDirectoryPath; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index e319160..2c08c18 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -44,7 +44,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.datatypes.ArrayDataType; import org.apache.carbondata.processing.datatypes.GenericDataType; @@ -143,12 +142,9 @@ public final class CarbonDataProcessorUtil { String[] baseTmpStorePathArray = StringUtils.split(baseTempStorePath, File.pathSeparator); String[] localDataFolderLocArray = new String[baseTmpStorePathArray.length]; - CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); for (int i = 0 ; i < baseTmpStorePathArray.length; i++) { String tmpStore = baseTmpStorePathArray[i]; - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(tmpStore, carbonTable.getCarbonTableIdentifier()); - String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId); + String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(tmpStore, segmentId); localDataFolderLocArray[i] = carbonDataDirectoryPath + File.separator + taskId; } @@ -376,16 +372,12 @@ public final class CarbonDataProcessorUtil { * @return data directory path */ public static String createCarbonStoreLocation(String factStoreLocation, - String databaseName, String tableName, String partitionId, String segmentId) { + String databaseName, String tableName, String segmentId) { CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); - CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier(); - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier); - String carbonDataDirectoryPath = - carbonTablePath.getCarbonDataDirectoryPath(segmentId); - return carbonDataDirectoryPath; + return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId); } + /** * initialise data type for measures for their storage format */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index c135a88..e7c52f6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -46,7 +46,6 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation; import org.apache.carbondata.core.locks.CarbonLockUtil; import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.ColumnIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -55,7 +54,6 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.merger.NodeBlockRelation; @@ -74,11 +72,8 @@ public final class CarbonLoaderUtil { } public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) { - CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable(); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier()); - - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(currentLoad + ""); + String segmentPath = CarbonTablePath.getSegmentPath( + loadModel.getTablePath(), currentLoad + ""); deleteStorePath(segmentPath); } @@ -91,33 +86,26 @@ public final class CarbonLoaderUtil { */ public static boolean isValidSegment(CarbonLoadModel loadModel, int currentLoad) { - CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema() - .getCarbonTable(); - CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath( - loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier()); int fileCount = 0; - int partitionCount = carbonTable.getPartitionCount(); - for (int i = 0; i < partitionCount; i++) { - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath( - currentLoad + ""); - CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath, - FileFactory.getFileType(segmentPath)); - CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() { - - @Override - public boolean accept(CarbonFile file) { - return file.getName().endsWith( - CarbonTablePath.getCarbonIndexExtension()) - || file.getName().endsWith( - CarbonTablePath.getCarbonDataExtension()); - } - - }); - fileCount += files.length; - if (files.length > 0) { - return true; + String segmentPath = CarbonTablePath.getSegmentPath( + loadModel.getTablePath(), currentLoad + ""); + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath, + FileFactory.getFileType(segmentPath)); + CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() { + + @Override + public boolean accept(CarbonFile file) { + return file.getName().endsWith( + CarbonTablePath.getCarbonIndexExtension()) + || file.getName().endsWith( + CarbonTablePath.getCarbonDataExtension()); } + + }); + fileCount += files.length; + if (files.length > 0) { + return true; } if (fileCount == 0) { return false; @@ -183,21 +171,20 @@ public final class CarbonLoaderUtil { CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid, List<Segment> segmentsToBeDeleted, List<Segment> segmentFilesTobeUpdated) throws IOException { boolean status = false; - AbsoluteTableIdentifier absoluteTableIdentifier = + AbsoluteTableIdentifier identifier = loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); - CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); - String metadataPath = carbonTablePath.getMetadataDirectoryPath(); + String metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath()); FileType fileType = FileFactory.getFileType(metadataPath); if (!FileFactory.isFileExist(metadataPath, fileType)) { FileFactory.mkdirs(metadataPath, fileType); } String tableStatusPath; if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() && !uuid.isEmpty()) { - tableStatusPath = carbonTablePath.getTableStatusFilePathWithUUID(uuid); + tableStatusPath = CarbonTablePath.getTableStatusFilePathWithUUID(uuid); } else { - tableStatusPath = carbonTablePath.getTableStatusFilePath(); + tableStatusPath = CarbonTablePath.getTableStatusFilePath(); } - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); int retryCount = CarbonLockUtil .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, @@ -211,7 +198,8 @@ public final class CarbonLoaderUtil { "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName() + " for table status updation"); LoadMetadataDetails[] listOfLoadFolderDetailsArray = - SegmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath()); + SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(identifier.getTablePath())); List<LoadMetadataDetails> listOfLoadFolderDetails = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); List<CarbonFile> staleFolders = new ArrayList<>(); @@ -238,12 +226,12 @@ public final class CarbonLoaderUtil { for (LoadMetadataDetails entry : listOfLoadFolderDetails) { if (entry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS && SegmentStatusManager.isLoadInProgress( - absoluteTableIdentifier, entry.getLoadName())) { + identifier, entry.getLoadName())) { throw new RuntimeException("Already insert overwrite is in progress"); } else if (newMetaEntry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS && entry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS && SegmentStatusManager.isLoadInProgress( - absoluteTableIdentifier, entry.getLoadName())) { + identifier, entry.getLoadName())) { throw new RuntimeException("Already insert into or load is in progress"); } } @@ -268,7 +256,7 @@ public final class CarbonLoaderUtil { entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE); // For insert overwrite, we will delete the old segment folder immediately // So collect the old segments here - addToStaleFolders(carbonTablePath, staleFolders, entry); + addToStaleFolders(identifier, staleFolders, entry); } } } @@ -281,7 +269,7 @@ public final class CarbonLoaderUtil { // when no records are inserted then newSegmentEntry will be SegmentStatus.MARKED_FOR_DELETE // so empty segment folder should be deleted if (newMetaEntry.getSegmentStatus() == SegmentStatus.MARKED_FOR_DELETE) { - addToStaleFolders(carbonTablePath, staleFolders, newMetaEntry); + addToStaleFolders(identifier, staleFolders, newMetaEntry); } for (LoadMetadataDetails detail: listOfLoadFolderDetails) { @@ -326,9 +314,10 @@ public final class CarbonLoaderUtil { return status; } - private static void addToStaleFolders(CarbonTablePath carbonTablePath, + private static void addToStaleFolders(AbsoluteTableIdentifier identifier, List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException { - String path = carbonTablePath.getCarbonDataDirectoryPath(entry.getLoadName()); + String path = CarbonTablePath.getSegmentPath( + identifier.getTablePath(), entry.getLoadName()); // add to the deletion list only if file exist else HDFS file system will throw // exception while deleting the file if file path does not exist if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) { @@ -354,11 +343,9 @@ public final class CarbonLoaderUtil { loadMetadataDetails.setLoadStartTime(loadStartTime); } - public static void writeLoadMetadata(AbsoluteTableIdentifier absoluteTableIdentifier, + public static void writeLoadMetadata(AbsoluteTableIdentifier identifier, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException { - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); - String dataLoadLocation = carbonTablePath.getTableStatusFilePath(); + String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); DataOutputStream dataOutputStream; Gson gsonObjectToWrite = new Gson(); @@ -906,10 +893,8 @@ public final class CarbonLoaderUtil { * This method will get the store location for the given path, segment id and partition id */ public static void checkAndCreateCarbonDataLocation(String segmentId, CarbonTable carbonTable) { - CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier(); - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath(), carbonTableIdentifier); - String segmentFolder = carbonTablePath.getCarbonDataDirectoryPath(segmentId); + String segmentFolder = CarbonTablePath.getSegmentPath( + carbonTable.getTablePath(), segmentId); CarbonUtil.checkAndCreateFolder(segmentFolder); } @@ -938,9 +923,7 @@ public final class CarbonLoaderUtil { */ public static Long addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails, String segmentId, CarbonTable carbonTable) throws IOException { - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier())); - Map<String, Long> dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, + Map<String, Long> dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTable.getAbsoluteTableIdentifier(), new Segment(segmentId, loadMetadataDetails.getSegmentFile())); Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE); loadMetadataDetails.setDataSize(String.valueOf(dataSize)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java index 288cd54..c00cc86 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java @@ -35,7 +35,6 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; public final class DeleteLoadFolders { @@ -50,15 +49,14 @@ public final class DeleteLoadFolders { /** * returns segment path * - * @param absoluteTableIdentifier + * @param identifier * @param oneLoad * @return */ - private static String getSegmentPath(AbsoluteTableIdentifier absoluteTableIdentifier, + private static String getSegmentPath(AbsoluteTableIdentifier identifier, LoadMetadataDetails oneLoad) { - CarbonTablePath carbon = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); String segmentId = oneLoad.getLoadName(); - return carbon.getCarbonDataDirectoryPath(segmentId); + return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId); } public static void physicalFactAndMeasureMetadataDeletion( http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java index cd1e28a..d30891a 100644 --- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java +++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java @@ -63,30 +63,6 @@ public class BlockIndexStoreTest extends TestCase { } -// public void testLoadAndGetTaskIdToSegmentsMapForSingleSegment() -// throws IOException { -// File file = getPartFile(); -// TableBlockInfo info = -// new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V1, null); -// CarbonTableIdentifier carbonTableIdentifier = -// new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1"); -// AbsoluteTableIdentifier absoluteTableIdentifier = -// new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier); -// try { -// -// List<TableBlockUniqueIdentifier> tableBlockInfoList = -// getTableBlockUniqueIdentifierList(Arrays.asList(new TableBlockInfo[] { info }), absoluteTableIdentifier); -// List<AbstractIndex> loadAndGetBlocks = cache.getAll(tableBlockInfoList); -// assertTrue(loadAndGetBlocks.size() == 1); -// } catch (Exception e) { -// assertTrue(false); -// } -// List<String> segmentIds = new ArrayList<>(); -// segmentIds.add(info.getSegment()); -// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); -// } -// private List<TableBlockUniqueIdentifier> getTableBlockUniqueIdentifierList(List<TableBlockInfo> tableBlockInfos, AbsoluteTableIdentifier absoluteTableIdentifier) { List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers = new ArrayList<>(); @@ -95,138 +71,6 @@ public class BlockIndexStoreTest extends TestCase { } return tableBlockUniqueIdentifiers; } -// -// public void testloadAndGetTaskIdToSegmentsMapForSameBlockLoadedConcurrently() -// throws IOException { -// String canonicalPath = -// new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath(); -// File file = getPartFile(); -// TableBlockInfo info = -// new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V1, null); -// TableBlockInfo info1 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V1, null); -// -// TableBlockInfo info2 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V1, null); -// TableBlockInfo info3 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V1, null); -// TableBlockInfo info4 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V1, null); -// -// CarbonTableIdentifier carbonTableIdentifier = -// new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1"); -// AbsoluteTableIdentifier absoluteTableIdentifier = -// new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier); -// ExecutorService executor = Executors.newFixedThreadPool(3); -// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }), -// absoluteTableIdentifier)); -// executor.submit( -// new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }), -// absoluteTableIdentifier)); -// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }), -// absoluteTableIdentifier)); -// executor.submit( -// new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }), -// absoluteTableIdentifier)); -// executor.shutdown(); -// try { -// executor.awaitTermination(1, TimeUnit.DAYS); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// List<TableBlockInfo> tableBlockInfos = -// Arrays.asList(new TableBlockInfo[] { info, info1, info2, info3, info4 }); -// try { -// List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers = -// getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier); -// List<AbstractIndex> loadAndGetBlocks = cache.getAll(tableBlockUniqueIdentifiers); -// assertTrue(loadAndGetBlocks.size() == 5); -// } catch (Exception e) { -// assertTrue(false); -// } -// List<String> segmentIds = new ArrayList<>(); -// for (TableBlockInfo tableBlockInfo : tableBlockInfos) { -// segmentIds.add(tableBlockInfo.getSegment()); -// } -// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); -// } -// -// public void testloadAndGetTaskIdToSegmentsMapForDifferentSegmentLoadedConcurrently() -// throws IOException { -// String canonicalPath = -// new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath(); -// File file = getPartFile(); -// TableBlockInfo info = -// new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V3, null); -// TableBlockInfo info1 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V3, null); -// -// TableBlockInfo info2 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V3, null); -// TableBlockInfo info3 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V3, null); -// TableBlockInfo info4 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V3, null); -// -// TableBlockInfo info5 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" }, -// file.length(),ColumnarFormatVersion.V3, null); -// TableBlockInfo info6 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V3, null); -// -// TableBlockInfo info7 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "3", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V3, null); -// -// CarbonTableIdentifier carbonTableIdentifier = -// new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1"); -// AbsoluteTableIdentifier absoluteTableIdentifier = -// new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier); -// ExecutorService executor = Executors.newFixedThreadPool(3); -// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }), -// absoluteTableIdentifier)); -// executor.submit( -// new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }), -// absoluteTableIdentifier)); -// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info5, info6 }), -// absoluteTableIdentifier)); -// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info7 }), -// absoluteTableIdentifier)); -// -// executor.shutdown(); -// try { -// executor.awaitTermination(1, TimeUnit.DAYS); -// } catch (InterruptedException e) { -// // TODO Auto-generated catch block -// e.printStackTrace(); -// } -// List<TableBlockInfo> tableBlockInfos = Arrays -// .asList(new TableBlockInfo[] { info, info1, info2, info3, info4, info5, info6, info7 }); -// try { -// List<TableBlockUniqueIdentifier> blockUniqueIdentifierList = -// getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier); -// List<AbstractIndex> loadAndGetBlocks = cache.getAll(blockUniqueIdentifierList); -// assertTrue(loadAndGetBlocks.size() == 8); -// } catch (Exception e) { -// assertTrue(false); -// } -// List<String> segmentIds = new ArrayList<>(); -// for (TableBlockInfo tableBlockInfo : tableBlockInfos) { -// segmentIds.add(tableBlockInfo.getSegment()); -// } -// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); -// } private class BlockLoaderThread implements Callable<Void> { private List<TableBlockInfo> tableBlockInfoList; @@ -248,7 +92,7 @@ public class BlockIndexStoreTest extends TestCase { } private static File getPartFile() { - String path = StoreCreator.getAbsoluteTableIdentifier().getTablePath() + String path = StoreCreator.getIdentifier().getTablePath() + "/Fact/Part0/Segment_0"; File file = new File(path); File[] files = file.listFiles(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java index 7f0aef6..d42dcde 100644 --- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java +++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java @@ -64,7 +64,6 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonDictionaryWriter; import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl; @@ -98,14 +97,14 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; */ public class StoreCreator { - private static AbsoluteTableIdentifier absoluteTableIdentifier; + private static AbsoluteTableIdentifier identifier; private static String storePath = ""; static { try { storePath = new File("target/store").getCanonicalPath(); String dbName = "testdb"; String tableName = "testtable"; - absoluteTableIdentifier = + identifier = AbsoluteTableIdentifier.from( storePath + "/testdb/testtable", new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString())); @@ -114,8 +113,8 @@ public class StoreCreator { } } - public static AbsoluteTableIdentifier getAbsoluteTableIdentifier() { - return absoluteTableIdentifier; + public static AbsoluteTableIdentifier getIdentifier() { + return identifier; } /** @@ -134,12 +133,12 @@ public class StoreCreator { CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table); CarbonLoadModel loadModel = new CarbonLoadModel(); loadModel.setCarbonDataLoadSchema(schema); - loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName()); - loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); - loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); + loadModel.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName()); + loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName()); + loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName()); loadModel.setFactFilePath(factFilePath); loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>()); - loadModel.setTablePath(absoluteTableIdentifier.getTablePath()); + loadModel.setTablePath(identifier.getTablePath()); loadModel.setDateFormat(null); loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, @@ -175,9 +174,9 @@ public class StoreCreator { private static CarbonTable createTable() throws IOException { TableInfo tableInfo = new TableInfo(); - tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName()); + tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName()); TableSchema tableSchema = new TableSchema(); - tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); + tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName()); List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>(); ArrayList<Encoding> encodings = new ArrayList<>(); encodings.add(Encoding.DICTIONARY); @@ -257,16 +256,13 @@ public class StoreCreator { tableSchema.setSchemaEvalution(schemaEvol); tableSchema.setTableId(UUID.randomUUID().toString()); tableInfo.setTableUniqueName( - absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName() + identifier.getCarbonTableIdentifier().getTableUniqueName() ); tableInfo.setLastUpdatedTime(System.currentTimeMillis()); tableInfo.setFactTable(tableSchema); - tableInfo.setTablePath(absoluteTableIdentifier.getTablePath()); + tableInfo.setTablePath(identifier.getTablePath()); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getTablePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); - String schemaFilePath = carbonTablePath.getSchemaFilePath(); + String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath()); String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath); CarbonMetadata.getInstance().loadTableMetadata(tableInfo); @@ -329,7 +325,7 @@ public class StoreCreator { writer.close(); writer.commit(); Dictionary dict = (Dictionary) dictCache.get( - new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, + new DictionaryColumnUniqueIdentifier(identifier, columnIdentifier, dims.get(i).getDataType())); CarbonDictionarySortInfoPreparator preparator = new CarbonDictionarySortInfoPreparator(); @@ -444,7 +440,7 @@ public class StoreCreator { loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime())); listOfLoadFolderDetails.add(loadMetadataDetails); - String dataLoadLocation = schema.getCarbonTable().getMetaDataFilepath() + File.separator + String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + File.separator + CarbonCommonConstants.LOADMETADATA_FILENAME; DataOutputStream dataOutputStream; http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java index 7b823ac..8c9889d 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java @@ -38,7 +38,6 @@ import org.apache.carbondata.core.statusmanager.FileFormat; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonIndexFileWriter; import org.apache.carbondata.format.BlockIndex; @@ -60,8 +59,6 @@ public class StreamSegment { * get stream segment or create new stream segment if not exists */ public static String open(CarbonTable table) throws IOException { - CarbonTablePath tablePath = - CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier()); SegmentStatusManager segmentStatusManager = new SegmentStatusManager(table.getAbsoluteTableIdentifier()); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); @@ -72,7 +69,8 @@ public class StreamSegment { + " for stream table get or create segment"); LoadMetadataDetails[] details = - SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath()); + SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(table.getTablePath())); LoadMetadataDetails streamSegment = null; for (LoadMetadataDetails detail : details) { if (FileFormat.ROW_V1 == detail.getFileFormat()) { @@ -97,8 +95,8 @@ public class StreamSegment { newDetails[i] = details[i]; } newDetails[i] = newDetail; - SegmentStatusManager - .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), newDetails); + SegmentStatusManager.writeLoadDetailsIntoFile( + CarbonTablePath.getTableStatusFilePath(table.getTablePath()), newDetails); return newDetail.getLoadName(); } else { return streamSegment.getLoadName(); @@ -126,8 +124,6 @@ public class StreamSegment { */ public static String close(CarbonTable table, String segmentId) throws IOException { - CarbonTablePath tablePath = - CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier()); SegmentStatusManager segmentStatusManager = new SegmentStatusManager(table.getAbsoluteTableIdentifier()); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); @@ -138,7 +134,8 @@ public class StreamSegment { + " for stream table finish segment"); LoadMetadataDetails[] details = - SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath()); + SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(table.getTablePath())); for (LoadMetadataDetails detail : details) { if (segmentId.equals(detail.getLoadName())) { detail.setLoadEndTime(System.currentTimeMillis()); @@ -162,7 +159,8 @@ public class StreamSegment { } newDetails[i] = newDetail; SegmentStatusManager - .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), newDetails); + .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath( + table.getTablePath()), newDetails); return newDetail.getLoadName(); } else { LOGGER.error( @@ -192,7 +190,7 @@ public class StreamSegment { try { if (statusLock.lockWithRetries()) { LoadMetadataDetails[] details = - SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath()); + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); boolean updated = false; for (LoadMetadataDetails detail : details) { if (SegmentStatus.STREAMING == detail.getSegmentStatus()) { @@ -202,10 +200,8 @@ public class StreamSegment { } } if (updated) { - CarbonTablePath tablePath = - CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier()); SegmentStatusManager.writeLoadDetailsIntoFile( - tablePath.getTableStatusFilePath(), + CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()), details); } } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index 790f9d8..c9e61d3 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -38,7 +38,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.result.iterator.RawResultIterator import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection} import org.apache.carbondata.hadoop.api.CarbonTableInputFormat import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader} @@ -221,7 +221,6 @@ object StreamHandoffRDD { ): Unit = { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val identifier = carbonTable.getAbsoluteTableIdentifier - val tablePath = CarbonStorePath.getCarbonTablePath(identifier) var continueHandoff = false // require handoff lock on table val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK) @@ -238,7 +237,7 @@ object StreamHandoffRDD { try { if (statusLock.lockWithRetries()) { loadMetadataDetails = SegmentStatusManager.readLoadMetadata( - tablePath.getMetadataDirectoryPath) + CarbonTablePath.getMetadataPath(identifier.getTablePath)) } } finally { if (null != statusLock) { @@ -360,19 +359,16 @@ object StreamHandoffRDD { loadModel: CarbonLoadModel ): Boolean = { var status = false - val metaDataFilepath = - loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath() - val identifier = - loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier() - val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier) - val metadataPath = carbonTablePath.getMetadataDirectoryPath() + val metaDataFilepath = loadModel.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath + val identifier = loadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier + val metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath) val fileType = FileFactory.getFileType(metadataPath) if (!FileFactory.isFileExist(metadataPath, fileType)) { FileFactory.mkdirs(metadataPath, fileType) } - val tableStatusPath = carbonTablePath.getTableStatusFilePath() + val tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath) val segmentStatusManager = new SegmentStatusManager(identifier) - val carbonLock = segmentStatusManager.getTableStatusLock() + val carbonLock = segmentStatusManager.getTableStatusLock try { if (carbonLock.lockWithRetries()) { LOGGER.info( @@ -406,7 +402,7 @@ object StreamHandoffRDD { status = true } else { LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel - .getDatabaseName() + "." + loadModel.getTableName()); + .getDatabaseName() + "." + loadModel.getTableName()) } } finally { if (carbonLock.unlock()) { @@ -417,6 +413,6 @@ object StreamHandoffRDD { "." + loadModel.getTableName() + " during table status updation") } } - return status + status } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala index 75fcfb0..6316d84 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala @@ -31,7 +31,7 @@ import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceP import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} import org.apache.carbondata.processing.loading.model.CarbonLoadModel @@ -127,16 +127,14 @@ object StreamSinkFactory { * @return */ private def getStreamSegmentId(carbonTable: CarbonTable): String = { - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val fileType = FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath) - if (!FileFactory.isFileExist(carbonTablePath.getMetadataDirectoryPath, fileType)) { + val segmentId = StreamSegment.open(carbonTable) + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) + val fileType = FileFactory.getFileType(segmentDir) + if (!FileFactory.isFileExist(segmentDir, fileType)) { // Create table directory path, in case of enabling hive metastore first load may not have // table folder created. - FileFactory.mkdirs(carbonTablePath.getMetadataDirectoryPath, fileType) + FileFactory.mkdirs(segmentDir, fileType) } - val segmentId = StreamSegment.open(carbonTable) - val segmentDir = carbonTablePath.getSegmentDir(segmentId) if (FileFactory.isFileExist(segmentDir, fileType)) { // recover fault StreamSegment.recoverSegmentIfRequired(segmentDir) http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e1f2b95/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala index 206ba91..4f839ce 100644 --- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala +++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala @@ -40,7 +40,7 @@ import org.apache.carbondata.core.dictionary.server.DictionaryServer import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.stats.QueryStatistic import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil @@ -62,9 +62,7 @@ class CarbonAppendableStreamSink( carbonLoadModel: CarbonLoadModel, server: Option[DictionaryServer]) extends Sink { - private val carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - private val fileLogPath = carbonTablePath.getStreamingLogDir + private val fileLogPath = CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath) private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath) // prepare configuration private val hadoopConf = { @@ -150,12 +148,12 @@ class CarbonAppendableStreamSink( * if the directory size of current segment beyond the threshold, hand off new segment */ private def checkOrHandOffSegment(): Unit = { - val segmentDir = carbonTablePath.getSegmentDir(currentSegmentId) + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId) val fileType = FileFactory.getFileType(segmentDir) if (segmentMaxSize <= StreamSegment.size(segmentDir)) { val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId) currentSegmentId = newSegmentId - val newSegmentDir = carbonTablePath.getSegmentDir(currentSegmentId) + val newSegmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId) FileFactory.mkdirs(newSegmentDir, fileType) // TODO trigger hand off operation @@ -251,15 +249,13 @@ object CarbonAppendableStreamSink { } // update data file info in index file - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - StreamSegment.updateIndexFile(tablePath.getSegmentDir(segmentId)) + StreamSegment.updateIndexFile( + CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)) } catch { // catch fault of executor side case t: Throwable => - val tablePath = - CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val segmentDir = tablePath.getSegmentDir(segmentId) + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) StreamSegment.recoverSegmentIfRequired(segmentDir) LOGGER.error(t, s"Aborting job ${ job.getJobID }.") committer.abortJob(job)