[CARBONDATA-2270] Write segment file in loading for non-partition table Currently when loading into partition table, carbon is writing a segment file to record the segment and index file location mapping. This can avoid frequent listFile operation when querying. The same should be done for non-partition table also.
This closes #2092 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7e0803fe Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7e0803fe Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7e0803fe Branch: refs/heads/master Commit: 7e0803fec02f5872569e7680ce4d3ab02507285b Parents: e43be5e Author: Jacky Li <[email protected]> Authored: Fri Mar 23 12:13:11 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Fri Mar 30 11:18:42 2018 +0800 ---------------------------------------------------------------------- .../carbondata/core/datamap/DataMapChooser.java | 5 +- .../core/datamap/DataMapStoreManager.java | 61 +++---- .../core/datamap/IndexDataMapProvider.java | 3 +- .../core/datastore/SegmentTaskIndexStore.java | 14 +- .../core/metadata/SegmentFileStore.java | 172 +++++++++++++++++-- .../core/mutate/CarbonUpdateUtil.java | 70 ++++++-- .../SegmentUpdateStatusManager.java | 72 ++++---- .../core/writer/CarbonIndexFileMergeWriter.java | 46 +++-- .../hadoop/api/CarbonFileInputFormat.java | 7 +- .../hadoop/api/CarbonInputFormat.java | 6 +- .../hadoop/api/CarbonOutputCommitter.java | 3 +- .../hadoop/api/CarbonTableInputFormat.java | 32 ++-- .../hadoop/api/DistributableDataMapFormat.java | 10 +- .../sdv/generated/MergeIndexTestCase.scala | 8 +- .../CarbonIndexFileMergeTestCase.scala | 14 +- .../iud/DeleteCarbonTableTestCase.scala | 10 +- .../iud/UpdateCarbonTableTestCase.scala | 8 +- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 4 +- .../org/apache/spark/util/PartitionUtils.scala | 21 ++- .../spark/rdd/CarbonDataRDDFactory.scala | 29 +++- .../spark/rdd/CarbonTableCompactor.scala | 26 ++- .../org/apache/spark/sql/CarbonCountStar.scala | 4 +- .../command/mutation/DeleteExecution.scala | 4 +- .../command/mutation/HorizontalCompaction.scala | 4 +- ...arbonAlterTableAddHivePartitionCommand.scala | 4 +- .../CarbonAlterTableDropPartitionCommand.scala | 4 +- .../CarbonAlterTableSplitPartitionCommand.scala | 4 +- .../strategy/CarbonLateDecodeStrategy.scala | 2 +- .../processing/merger/CarbonDataMergerUtil.java | 16 +- .../processing/util/CarbonLoaderUtil.java | 76 +------- 30 files changed, 458 insertions(+), 281 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java index f9214a8..ac00e71 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java @@ -87,8 +87,9 @@ public class DataMapChooser { } } // Return the default datamap if no other datamap exists. - return new DataMapExprWrapperImpl(DataMapStoreManager.getInstance() - .getDefaultDataMap(carbonTable.getAbsoluteTableIdentifier()), resolverIntf); + return new DataMapExprWrapperImpl( + DataMapStoreManager.getInstance().getDefaultDataMap(carbonTable), + resolverIntf); } private ExpressionTuple selectDataMap(Expression expression, List<TableDataMap> allDataMap) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index d01df4f..b0aff0a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -102,7 +102,7 @@ public final class DataMapStoreManager { if (dataMapSchema.isIndexDataMap() && identifier.getTableName() .equals(carbonTable.getTableName()) && identifier.getDatabaseName() .equals(carbonTable.getDatabaseName())) { - dataMaps.add(getDataMap(carbonTable.getAbsoluteTableIdentifier(), dataMapSchema)); + dataMaps.add(getDataMap(carbonTable, dataMapSchema)); } } } @@ -187,32 +187,33 @@ public final class DataMapStoreManager { /** * It gives the default datamap of the table. Default datamap of any table is BlockletDataMap * - * @param identifier + * @param table * @return */ - public TableDataMap getDefaultDataMap(AbsoluteTableIdentifier identifier) { - return getDataMap(identifier, BlockletDataMapFactory.DATA_MAP_SCHEMA); + public TableDataMap getDefaultDataMap(CarbonTable table) { + return getDataMap(table, BlockletDataMapFactory.DATA_MAP_SCHEMA); } /** * Get the datamap for reading data. */ - public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) { - String table = identifier.getCarbonTableIdentifier().getTableUniqueName(); - List<TableDataMap> tableIndices = allDataMaps.get(table); + public TableDataMap getDataMap(CarbonTable table, DataMapSchema dataMapSchema) { + String tableUniqueName = + table.getAbsoluteTableIdentifier().getCarbonTableIdentifier().getTableUniqueName(); + List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName); TableDataMap dataMap = null; if (tableIndices != null) { dataMap = getTableDataMap(dataMapSchema.getDataMapName(), tableIndices); } if (dataMap == null) { - synchronized (table.intern()) { - tableIndices = allDataMaps.get(table); + synchronized (tableUniqueName.intern()) { + tableIndices = allDataMaps.get(tableUniqueName); if (tableIndices != null) { dataMap = getTableDataMap(dataMapSchema.getDataMapName(), tableIndices); } if (dataMap == null) { try { - dataMap = createAndRegisterDataMap(identifier, dataMapSchema); + dataMap = createAndRegisterDataMap(table, dataMapSchema); } catch (Exception e) { throw new RuntimeException(e); } @@ -231,7 +232,7 @@ public final class DataMapStoreManager { * The datamap is created using datamap name, datamap factory class and table identifier. */ // TODO: make it private - public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier, + public TableDataMap createAndRegisterDataMap(CarbonTable table, DataMapSchema dataMapSchema) throws MalformedDataMapCommandException, IOException { DataMapFactory dataMapFactory; try { @@ -246,34 +247,34 @@ public final class DataMapStoreManager { throw new MetadataProcessException( "failed to create DataMap '" + dataMapSchema.getProviderName() + "'", e); } - return registerDataMap(identifier, dataMapSchema, dataMapFactory); + return registerDataMap(table, dataMapSchema, dataMapFactory); } - public TableDataMap registerDataMap(AbsoluteTableIdentifier identifier, + public TableDataMap registerDataMap(CarbonTable table, DataMapSchema dataMapSchema, DataMapFactory dataMapFactory) throws IOException, MalformedDataMapCommandException { - String table = identifier.getCarbonTableIdentifier().getTableUniqueName(); + String tableUniqueName = table.getCarbonTableIdentifier().getTableUniqueName(); // Just update the segmentRefreshMap with the table if not added. - getTableSegmentRefresher(identifier); - List<TableDataMap> tableIndices = allDataMaps.get(table); + getTableSegmentRefresher(table); + List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName); if (tableIndices == null) { tableIndices = new ArrayList<>(); } - dataMapFactory.init(identifier, dataMapSchema); + dataMapFactory.init(table.getAbsoluteTableIdentifier(), dataMapSchema); BlockletDetailsFetcher blockletDetailsFetcher; SegmentPropertiesFetcher segmentPropertiesFetcher = null; if (dataMapFactory instanceof BlockletDetailsFetcher) { blockletDetailsFetcher = (BlockletDetailsFetcher) dataMapFactory; } else { - blockletDetailsFetcher = getBlockletDetailsFetcher(identifier); + blockletDetailsFetcher = getBlockletDetailsFetcher(table); } segmentPropertiesFetcher = (SegmentPropertiesFetcher) blockletDetailsFetcher; - TableDataMap dataMap = new TableDataMap(identifier, dataMapSchema, dataMapFactory, - blockletDetailsFetcher, segmentPropertiesFetcher); + TableDataMap dataMap = new TableDataMap(table.getAbsoluteTableIdentifier(), + dataMapSchema, dataMapFactory, blockletDetailsFetcher, segmentPropertiesFetcher); tableIndices.add(dataMap); - allDataMaps.put(table, tableIndices); + allDataMaps.put(tableUniqueName, tableIndices); return dataMap; } @@ -294,7 +295,7 @@ public final class DataMapStoreManager { * @param segments */ public void clearInvalidSegments(CarbonTable carbonTable, List<Segment> segments) { - getDefaultDataMap(carbonTable.getAbsoluteTableIdentifier()).clear(segments); + getDefaultDataMap(carbonTable).clear(segments); List<TableDataMap> allDataMap = getAllDataMap(carbonTable); for (TableDataMap dataMap: allDataMap) { dataMap.clear(segments); @@ -347,11 +348,11 @@ public final class DataMapStoreManager { /** * Get the blocklet datamap factory to get the detail information of blocklets * - * @param identifier + * @param table * @return */ - private BlockletDetailsFetcher getBlockletDetailsFetcher(AbsoluteTableIdentifier identifier) { - TableDataMap blockletMap = getDataMap(identifier, BlockletDataMapFactory.DATA_MAP_SCHEMA); + private BlockletDetailsFetcher getBlockletDetailsFetcher(CarbonTable table) { + TableDataMap blockletMap = getDataMap(table, BlockletDataMapFactory.DATA_MAP_SCHEMA); return (BlockletDetailsFetcher) blockletMap.getDataMapFactory(); } @@ -367,10 +368,10 @@ public final class DataMapStoreManager { /** * Get the TableSegmentRefresher for the table. If not existed then add one and return. */ - public TableSegmentRefresher getTableSegmentRefresher(AbsoluteTableIdentifier identifier) { - String uniqueName = identifier.uniqueName(); + public TableSegmentRefresher getTableSegmentRefresher(CarbonTable table) { + String uniqueName = table.getAbsoluteTableIdentifier().uniqueName(); if (segmentRefreshMap.get(uniqueName) == null) { - segmentRefreshMap.put(uniqueName, new TableSegmentRefresher(identifier)); + segmentRefreshMap.put(uniqueName, new TableSegmentRefresher(table)); } return segmentRefreshMap.get(uniqueName); } @@ -388,8 +389,8 @@ public final class DataMapStoreManager { // altering. private Map<String, Boolean> manualSegmentRefresh = new HashMap<>(); - public TableSegmentRefresher(AbsoluteTableIdentifier identifier) { - SegmentUpdateStatusManager statusManager = new SegmentUpdateStatusManager(identifier); + TableSegmentRefresher(CarbonTable table) { + SegmentUpdateStatusManager statusManager = new SegmentUpdateStatusManager(table); SegmentUpdateDetails[] updateStatusDetails = statusManager.getUpdateStatusDetails(); for (SegmentUpdateDetails updateDetails : updateStatusDetails) { UpdateVO updateVO = statusManager.getInvalidTimestampRange(updateDetails.getSegmentName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java index e188bf1..b1729d1 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java @@ -47,8 +47,7 @@ public class IndexDataMapProvider implements DataMapProvider { new RelationIdentifier(mainTable.getDatabaseName(), mainTable.getTableName(), mainTable.getTableInfo().getFactTable().getTableId())); DataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema); - DataMapStoreManager.getInstance().registerDataMap( - mainTable.getAbsoluteTableIdentifier(), dataMapSchema, dataMapFactory); + DataMapStoreManager.getInstance().registerDataMap(mainTable, dataMapSchema, dataMapFactory); storageProvider.saveSchema(dataMapSchema); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java index 8ed5c18..d9e544f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java @@ -39,6 +39,7 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.exception.IndexBuilderException; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; @@ -87,8 +88,9 @@ public class SegmentTaskIndexStore SegmentTaskIndexWrapper segmentTaskIndexWrapper = null; try { segmentTaskIndexWrapper = - loadAndGetTaskIdToSegmentsMap(tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos(), - tableSegmentUniqueIdentifier.getAbsoluteTableIdentifier(), + loadAndGetTaskIdToSegmentsMap( + tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos(), + CarbonTable.buildFromTablePath("name", "path"), tableSegmentUniqueIdentifier); } catch (IndexBuilderException e) { throw new IOException(e.getMessage(), e); @@ -163,21 +165,20 @@ public class SegmentTaskIndexStore * map * * @param segmentToTableBlocksInfos segment id to block info - * @param absoluteTableIdentifier absolute table identifier + * @param table table handle * @return map of taks id to segment mapping * @throws IOException */ private SegmentTaskIndexWrapper loadAndGetTaskIdToSegmentsMap( Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos, - AbsoluteTableIdentifier absoluteTableIdentifier, + CarbonTable table, TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier) throws IOException { // task id to segment map Iterator<Map.Entry<String, List<TableBlockInfo>>> iteratorOverSegmentBlocksInfos = segmentToTableBlocksInfos.entrySet().iterator(); Map<TaskBucketHolder, AbstractIndex> taskIdToSegmentIndexMap = null; SegmentTaskIndexWrapper segmentTaskIndexWrapper = null; - SegmentUpdateStatusManager updateStatusManager = - new SegmentUpdateStatusManager(absoluteTableIdentifier); + SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(table); String segmentId = null; TaskBucketHolder taskBucketHolder = null; try { @@ -226,6 +227,7 @@ public class SegmentTaskIndexStore } Iterator<Map.Entry<TaskBucketHolder, List<TableBlockInfo>>> iterator = taskIdToTableBlockInfoMap.entrySet().iterator(); + AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier(); long requiredSize = calculateRequiredSize(taskIdToTableBlockInfoMap, absoluteTableIdentifier); segmentTaskIndexWrapper.setMemorySize(requiredSize); http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index 4adc977..257ee4c 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -20,6 +20,7 @@ import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; @@ -32,6 +33,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; @@ -42,6 +45,8 @@ import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; import org.apache.carbondata.core.fileoperations.FileWriteOperation; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.locks.CarbonLockUtil; +import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; @@ -61,6 +66,9 @@ import org.apache.hadoop.fs.Path; */ public class SegmentFileStore { + private static LogService LOGGER = LogServiceFactory.getLogService( + SegmentFileStore.class.getCanonicalName()); + private SegmentFile segmentFile; /** @@ -104,7 +112,6 @@ public class SegmentFileStore { isRelative = true; } SegmentFile segmentFile = new SegmentFile(); - Map<String, FolderDetails> locationMap = new HashMap<>(); FolderDetails folderDetails = new FolderDetails(); folderDetails.setRelative(isRelative); folderDetails.setPartitions(partionNames); @@ -112,8 +119,7 @@ public class SegmentFileStore { for (CarbonFile file : carbonFiles) { folderDetails.getFiles().add(file.getName()); } - locationMap.put(location, folderDetails); - segmentFile.setLocationMap(locationMap); + segmentFile.addPath(location, folderDetails); String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT; // write segment info to new file. writeSegmentFile(segmentFile, path); @@ -122,6 +128,56 @@ public class SegmentFileStore { } /** + * Generate Segment file name + * @param segmentId segment id + * @param UUID unique string, typically caller can use the loading start + * timestamp in CarbonLoadModel + * @return + */ + public static String genSegmentFileName(String segmentId, String UUID) { + return segmentId + "_" + UUID; + } + + /** + * Write segment file to the metadata folder of the table + * @param tablePath table path + * @param segmentId segment id + * @param UUID a UUID string used to construct the segment file name + * @return segment file name + */ + public static String writeSegmentFile(String tablePath, String segmentId, String UUID) + throws IOException { + String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId); + CarbonFile segmentFolder = FileFactory.getCarbonFile(segmentPath); + CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT); + } + }); + if (indexFiles != null && indexFiles.length > 0) { + SegmentFile segmentFile = new SegmentFile(); + FolderDetails folderDetails = new FolderDetails(); + folderDetails.setRelative(true); + folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage()); + for (CarbonFile file : indexFiles) { + folderDetails.getFiles().add(file.getName()); + } + String segmentRelativePath = segmentPath.substring(tablePath.length(), segmentPath.length()); + segmentFile.addPath(segmentRelativePath, folderDetails); + String segmentFileFolder = CarbonTablePath.getSegmentFilesLocation(tablePath); + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFileFolder); + if (!carbonFile.exists()) { + carbonFile.mkdirs(segmentFileFolder, FileFactory.getFileType(segmentFileFolder)); + } + String segmentFileName = genSegmentFileName(segmentId, UUID) + CarbonTablePath.SEGMENT_EXT; + // write segment info to new file. + writeSegmentFile(segmentFile, segmentFileFolder + File.separator + segmentFileName); + return segmentFileName; + } + return null; + } + + /** * Writes the segment file in json format * @param segmentFile * @param path @@ -177,6 +233,60 @@ public class SegmentFileStore { return null; } + /** + * This API will update the segmentFile of a passed segment. + * + * @return boolean which determines whether status update is done or not. + * @throws IOException + */ + public static boolean updateSegmentFile(String tablePath, String segmentId, String segmentFile) + throws IOException { + boolean status = false; + String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath); + String metadataPath = CarbonTablePath.getMetadataPath(tablePath); + AbsoluteTableIdentifier absoluteTableIdentifier = + AbsoluteTableIdentifier.from(tablePath, null, null); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); + int retryCount = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); + int maxTimeout = CarbonLockUtil + .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, + CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); + try { + if (carbonLock.lockWithRetries(retryCount, maxTimeout)) { + LOGGER.info("Acquired lock for tablepath" + tablePath + " for table status updation"); + LoadMetadataDetails[] listOfLoadFolderDetailsArray = + SegmentStatusManager.readLoadMetadata(metadataPath); + + for (LoadMetadataDetails detail : listOfLoadFolderDetailsArray) { + // if the segments is in the list of marked for delete then update the status. + if (segmentId.equals(detail.getLoadName())) { + detail.setSegmentFile(segmentFile); + break; + } + } + + SegmentStatusManager + .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray); + status = true; + } else { + LOGGER.error( + "Not able to acquire the lock for Table status updation for table path " + tablePath); + } + ; + } finally { + if (carbonLock.unlock()) { + LOGGER.info("Table unlocked successfully after table status updation" + tablePath); + } else { + LOGGER.error( + "Unable to unlock Table lock for table" + tablePath + " during table status updation"); + } + } + return status; + } + private static CarbonFile[] getSegmentFiles(String segmentPath) { CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); if (carbonFile.exists()) { @@ -213,7 +323,6 @@ public class SegmentFileStore { isRelative = true; } SegmentFile localSegmentFile = new SegmentFile(); - Map<String, FolderDetails> locationMap = new HashMap<>(); FolderDetails folderDetails = new FolderDetails(); folderDetails.setRelative(isRelative); folderDetails.setPartitions(spec.getPartitions()); @@ -228,8 +337,7 @@ public class SegmentFileStore { folderDetails.getFiles().add(file.getName()); } } - locationMap.put(location, folderDetails); - localSegmentFile.setLocationMap(locationMap); + localSegmentFile.addPath(location, folderDetails); if (segmentFile == null) { segmentFile = localSegmentFile; } else { @@ -440,7 +548,8 @@ public class SegmentFileStore { if (updateSegment) { String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath); writePath = - writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentNo() + "_" + uniqueId + writePath + CarbonCommonConstants.FILE_SEPARATOR + + SegmentFileStore.genSegmentFileName(segment.getSegmentNo(), String.valueOf(uniqueId)) + CarbonTablePath.SEGMENT_EXT; writeSegmentFile(segmentFile, writePath); } @@ -564,9 +673,17 @@ public class SegmentFileStore { Map<String, List<String>> locationMap) { for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) { Path location = new Path(entry.getKey()).getParent(); - boolean exists = pathExistsInPartitionSpec(partitionSpecs, location); - if (!exists) { - FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString())); + if (partitionSpecs != null) { + boolean exists = pathExistsInPartitionSpec(partitionSpecs, location); + if (!exists) { + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString())); + } + } else { + // delete the segment folder if it is empty + CarbonFile file = FileFactory.getCarbonFile(location.toString()); + if (file.listFiles().length == 0) { + file.delete(); + } } } } @@ -698,9 +815,16 @@ public class SegmentFileStore { private static final long serialVersionUID = 3582245668420401089L; + /** + * mapping of index file parent folder to the index file folder info + */ private Map<String, FolderDetails> locationMap; - public SegmentFile merge(SegmentFile mapper) { + SegmentFile() { + locationMap = new HashMap<>(); + } + + SegmentFile merge(SegmentFile mapper) { if (this == mapper) { return this; } @@ -724,9 +848,13 @@ public class SegmentFileStore { return locationMap; } - public void setLocationMap(Map<String, FolderDetails> locationMap) { - this.locationMap = locationMap; + /** + * Add index file parent folder and the index file folder info + */ + void addPath(String path, FolderDetails details) { + locationMap.put(path, details); } + } /** @@ -736,14 +864,32 @@ public class SegmentFileStore { private static final long serialVersionUID = 501021868886928553L; + /** + * Based on isRelative variable: + * 1. if it is relative, it is relative path to the table path, for all index files + * 2. if it is not relative, it is the full path of all index files + */ private Set<String> files = new HashSet<>(); + /** + * all partition names + */ private List<String> partitions = new ArrayList<>(); + /** + * status for the partition, success or mark for delete + */ private String status; + /** + * file name for merge index file in this folder + */ private String mergeFileName; + /** + * true if it is relative path, for example, if user give partition location when + * adding the partition, it will be false + */ private boolean isRelative; public FolderDetails merge(FolderDetails folderDetails) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java index 4ff19cb..2b4dabe 100644 --- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,6 +35,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.mutate.data.BlockMappingVO; import org.apache.carbondata.core.mutate.data.RowCountDetailsVO; @@ -117,8 +119,7 @@ public class CarbonUpdateUtil { public static boolean updateSegmentStatus(List<SegmentUpdateDetails> updateDetailsList, CarbonTable table, String updateStatusFileIdentifier, boolean isCompaction) { boolean status = false; - SegmentUpdateStatusManager segmentUpdateStatusManager = - new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier()); + SegmentUpdateStatusManager segmentUpdateStatusManager = new SegmentUpdateStatusManager(table); ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock(); boolean lockStatus = false; @@ -419,7 +420,7 @@ public class CarbonUpdateUtil { * @param table clean up will be handled on this table. * @param forceDelete if true then max query execution timeout will not be considered. */ - public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) { + public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) throws IOException { SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier()); @@ -432,6 +433,8 @@ public class CarbonUpdateUtil { boolean isInvalidFile = false; + List<Segment> segmentFilesToBeUpdated = new ArrayList<>(); + // scan through each segment. for (LoadMetadataDetails segment : details) { @@ -453,11 +456,13 @@ public class CarbonUpdateUtil { CarbonFile[] allSegmentFiles = segDir.listFiles(); // scan through the segment and find the carbondatafiles and index files. - SegmentUpdateStatusManager updateStatusManager = - new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier()); + SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(table); + boolean updateSegmentFile = false; // deleting of the aborted file scenario. - deleteStaleCarbonDataFiles(segment, allSegmentFiles, updateStatusManager); + if (deleteStaleCarbonDataFiles(segment, allSegmentFiles, updateStatusManager)) { + updateSegmentFile = true; + } // get Invalid update delta files. CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager @@ -467,12 +472,9 @@ public class CarbonUpdateUtil { // now for each invalid delta file need to check the query execution time out // and then delete. - for (CarbonFile invalidFile : invalidUpdateDeltaFiles) { - compareTimestampsAndDelete(invalidFile, forceDelete, false); } - // do the same for the index files. CarbonFile[] invalidIndexFiles = updateStatusManager .getUpdateDeltaFilesList(segment.getLoadName(), false, @@ -483,10 +485,10 @@ public class CarbonUpdateUtil { // and then delete. for (CarbonFile invalidFile : invalidIndexFiles) { - - compareTimestampsAndDelete(invalidFile, forceDelete, false); + if (compareTimestampsAndDelete(invalidFile, forceDelete, false)) { + updateSegmentFile = true; + } } - // now handle all the delete delta files which needs to be deleted. // there are 2 cases here . // 1. if the block is marked as compacted then the corresponding delta files @@ -531,7 +533,11 @@ public class CarbonUpdateUtil { for (CarbonFile invalidFile : blockRelatedFiles) { - compareTimestampsAndDelete(invalidFile, forceDelete, false); + if (compareTimestampsAndDelete(invalidFile, forceDelete, false)) { + if (invalidFile.getName().endsWith(CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)) { + updateSegmentFile = true; + } + } } @@ -545,8 +551,27 @@ public class CarbonUpdateUtil { } } } + if (updateSegmentFile) { + segmentFilesToBeUpdated.add(Segment.toSegment(segment.getLoadName())); + } } } + String UUID = String.valueOf(System.currentTimeMillis()); + List<Segment> segmentFilesToBeUpdatedLatest = new ArrayList<>(); + for (Segment segment : segmentFilesToBeUpdated) { + String file = + SegmentFileStore.writeSegmentFile(table.getTablePath(), segment.getSegmentNo(), UUID); + segmentFilesToBeUpdatedLatest.add(new Segment(segment.getSegmentNo(), file)); + } + if (segmentFilesToBeUpdated.size() > 0) { + updateTableMetadataStatus( + new HashSet<Segment>(segmentFilesToBeUpdated), + table, + UUID, + false, + new ArrayList<Segment>(), + segmentFilesToBeUpdatedLatest); + } // delete the update table status files which are old. if (null != validUpdateStatusFile && !validUpdateStatusFile.isEmpty()) { @@ -589,7 +614,7 @@ public class CarbonUpdateUtil { * @param allSegmentFiles * @param updateStatusManager */ - private static void deleteStaleCarbonDataFiles(LoadMetadataDetails segment, + private static boolean deleteStaleCarbonDataFiles(LoadMetadataDetails segment, CarbonFile[] allSegmentFiles, SegmentUpdateStatusManager updateStatusManager) { CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager .getUpdateDeltaFilesList(segment.getLoadName(), false, @@ -607,9 +632,13 @@ public class CarbonUpdateUtil { true); // now for each invalid index file need to check the query execution time out // and then delete. + boolean updateSegmentFile = false; for (CarbonFile invalidFile : invalidIndexFiles) { - compareTimestampsAndDelete(invalidFile, true, false); + if (compareTimestampsAndDelete(invalidFile, true, false)) { + updateSegmentFile = true; + } } + return updateSegmentFile; } /** @@ -642,8 +671,9 @@ public class CarbonUpdateUtil { * @param forceDelete * @param isUpdateStatusFile if true then the parsing of file name logic changes. */ - private static void compareTimestampsAndDelete(CarbonFile invalidFile, - boolean forceDelete, boolean isUpdateStatusFile) { + private static boolean compareTimestampsAndDelete( + CarbonFile invalidFile, + boolean forceDelete, boolean isUpdateStatusFile) { long fileTimestamp = 0L; if (isUpdateStatusFile) { @@ -661,12 +691,14 @@ public class CarbonUpdateUtil { try { LOGGER.info("deleting the invalid file : " + invalidFile.getName()); CarbonUtil.deleteFoldersAndFiles(invalidFile); + return true; } catch (IOException e) { LOGGER.error("error in clean up of compacted files." + e.getMessage()); } catch (InterruptedException e) { LOGGER.error("error in clean up of compacted files." + e.getMessage()); } } + return false; } public static boolean isBlockInvalid(SegmentStatus blockStatus) { @@ -719,9 +751,9 @@ public class CarbonUpdateUtil { */ public static long getRowCount( BlockMappingVO blockMappingVO, - AbsoluteTableIdentifier absoluteTableIdentifier) { + CarbonTable carbonTable) { SegmentUpdateStatusManager updateStatusManager = - new SegmentUpdateStatusManager(absoluteTableIdentifier); + new SegmentUpdateStatusManager(carbonTable); long rowCount = 0; Map<String, Long> blockRowCountMap = blockMappingVO.getBlockRowCountMapping(); for (Map.Entry<String, Long> blockRowEntry : blockRowCountMap.entrySet()) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java index a21873d..308fe30 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -44,6 +44,7 @@ import org.apache.carbondata.core.locks.CarbonLockFactory; import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.locks.LockUsage; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.mutate.SegmentUpdateDetails; import org.apache.carbondata.core.mutate.TupleIdEnum; @@ -70,31 +71,24 @@ public class SegmentUpdateStatusManager { private Map<String, SegmentUpdateDetails> blockAndDetailsMap; private boolean isPartitionTable; - public SegmentUpdateStatusManager(AbsoluteTableIdentifier absoluteTableIdentifier, + public SegmentUpdateStatusManager(CarbonTable table, LoadMetadataDetails[] segmentDetails) { - this.identifier = absoluteTableIdentifier; + this.identifier = table.getAbsoluteTableIdentifier(); // current it is used only for read function scenarios, as file update always requires to work // on latest file status. this.segmentDetails = segmentDetails; - if (segmentDetails.length > 0) { - isPartitionTable = segmentDetails[0].getSegmentFile() != null; - } + isPartitionTable = table.isHivePartitionTable(); updateDetails = readLoadMetadata(); populateMap(); } - /** - * @param identifier - */ - public SegmentUpdateStatusManager(AbsoluteTableIdentifier identifier) { - this.identifier = identifier; + public SegmentUpdateStatusManager(CarbonTable table) { + this.identifier = table.getAbsoluteTableIdentifier(); // current it is used only for read function scenarios, as file update always requires to work // on latest file status. segmentDetails = SegmentStatusManager.readLoadMetadata( CarbonTablePath.getMetadataPath(identifier.getTablePath())); - if (segmentDetails.length > 0) { - isPartitionTable = segmentDetails[0].getSegmentFile() != null; - } + isPartitionTable = table.isHivePartitionTable(); updateDetails = readLoadMetadata(); populateMap(); } @@ -261,36 +255,30 @@ public class SegmentUpdateStatusManager { * Returns all delta file paths of specified block */ private List<String> getDeltaFiles(String tupleId, String extension) throws Exception { - try { - String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID); - String completeBlockName = CarbonTablePath.addDataPartPrefix( - CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID) - + CarbonCommonConstants.FACT_FILE_EXT); - String blockPath; - if (isPartitionTable) { - blockPath = identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR - + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID) - .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName; - } else { - String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath( - identifier.getTablePath(), segment); - blockPath = - carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName; - } - CarbonFile file = FileFactory.getCarbonFile(blockPath, FileFactory.getFileType(blockPath)); - if (!file.exists()) { - throw new Exception("Invalid tuple id " + tupleId); - } - String blockNameWithoutExtn = completeBlockName.substring(0, completeBlockName.indexOf('.')); - //blockName without timestamp - final String blockNameFromTuple = - blockNameWithoutExtn.substring(0, blockNameWithoutExtn.lastIndexOf("-")); - return getDeltaFiles(file, blockNameFromTuple, extension, segment); - } catch (Exception ex) { - String errorMsg = "Invalid tuple id " + tupleId; - LOG.error(errorMsg); - throw new Exception(errorMsg); + String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID); + String completeBlockName = CarbonTablePath.addDataPartPrefix( + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID) + + CarbonCommonConstants.FACT_FILE_EXT); + String blockPath; + if (isPartitionTable) { + blockPath = identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID) + .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName; + } else { + String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath( + identifier.getTablePath(), segment); + blockPath = + carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName; + } + CarbonFile file = FileFactory.getCarbonFile(blockPath, FileFactory.getFileType(blockPath)); + if (!file.exists()) { + throw new Exception("Invalid tuple id " + tupleId); } + String blockNameWithoutExtn = completeBlockName.substring(0, completeBlockName.indexOf('.')); + //blockName without timestamp + final String blockNameFromTuple = + blockNameWithoutExtn.substring(0, blockNameWithoutExtn.lastIndexOf("-")); + return getDeltaFiles(file, blockNameFromTuple, extension, segment); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java index bc150e5..bcecdef 100644 --- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java @@ -30,6 +30,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.fileoperations.FileWriteOperation; import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; import org.apache.carbondata.core.metadata.SegmentFileStore; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.format.MergedBlockIndex; @@ -40,10 +41,19 @@ import org.apache.hadoop.fs.Path; public class CarbonIndexFileMergeWriter { /** + * table handle + */ + private CarbonTable table; + + /** * thrift writer object */ private ThriftWriter thriftWriter; + public CarbonIndexFileMergeWriter(CarbonTable table) { + this.table = table; + } + /** * Merge all the carbonindex files of segment to a merged file * @param tablePath @@ -54,7 +64,7 @@ public class CarbonIndexFileMergeWriter { * which do not store the blocklet info to current version * @throws IOException */ - private SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId, + private String mergeCarbonIndexFilesOfSegment(String segmentId, String tablePath, List<String> indexFileNamesTobeAdded, boolean readFileFooterFromCarbonDataFile) throws IOException { Segment segment = Segment.getSegment(segmentId, tablePath); @@ -70,17 +80,18 @@ public class CarbonIndexFileMergeWriter { } if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) { if (sfs == null) { - return mergeNormalSegment(indexFileNamesTobeAdded, readFileFooterFromCarbonDataFile, - segmentPath, indexFiles); + return writeMergeIndexFileBasedOnSegmentFolder( + indexFileNamesTobeAdded, readFileFooterFromCarbonDataFile, segmentPath, indexFiles); } else { - return mergePartitionSegment(indexFileNamesTobeAdded, sfs, indexFiles); + return writeMergeIndexFileBasedOnSegmentFile( + segmentId, indexFileNamesTobeAdded, sfs, indexFiles); } } return null; } - private SegmentIndexFIleMergeStatus mergeNormalSegment(List<String> indexFileNamesTobeAdded, + private String writeMergeIndexFileBasedOnSegmentFolder(List<String> indexFileNamesTobeAdded, boolean readFileFooterFromCarbonDataFile, String segmentPath, CarbonFile[] indexFiles) throws IOException { SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); @@ -100,7 +111,9 @@ public class CarbonIndexFileMergeWriter { return null; } - private SegmentIndexFIleMergeStatus mergePartitionSegment(List<String> indexFileNamesTobeAdded, + private String writeMergeIndexFileBasedOnSegmentFile( + String segmentId, + List<String> indexFileNamesTobeAdded, SegmentFileStore sfs, CarbonFile[] indexFiles) throws IOException { SegmentIndexFileStore fileStore = new SegmentIndexFileStore(); fileStore @@ -133,11 +146,20 @@ public class CarbonIndexFileMergeWriter { } } - List<String> filesTobeDeleted = new ArrayList<>(); + String uniqueId = String.valueOf(System.currentTimeMillis()); + String newSegmentFileName = + SegmentFileStore.genSegmentFileName(segmentId, String.valueOf(uniqueId)) + + CarbonTablePath.SEGMENT_EXT; + String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath()) + + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName; + SegmentFileStore.writeSegmentFile(sfs.getSegmentFile(), path); + SegmentFileStore.updateSegmentFile(table.getTablePath(), segmentId, newSegmentFileName); + for (CarbonFile file : indexFiles) { - filesTobeDeleted.add(file.getAbsolutePath()); + file.delete(); } - return new SegmentIndexFIleMergeStatus(sfs.getSegmentFile(), filesTobeDeleted); + + return uniqueId; } private String writeMergeIndexFile(List<String> indexFileNamesTobeAdded, String segmentPath, @@ -173,7 +195,7 @@ public class CarbonIndexFileMergeWriter { * @param indexFileNamesTobeAdded * @throws IOException */ - public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId, + public String mergeCarbonIndexFilesOfSegment(String segmentId, String tablePath, List<String> indexFileNamesTobeAdded) throws IOException { return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, indexFileNamesTobeAdded, false); } @@ -184,7 +206,7 @@ public class CarbonIndexFileMergeWriter { * @param segmentId * @throws IOException */ - public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId, + public String mergeCarbonIndexFilesOfSegment(String segmentId, String tablePath) throws IOException { return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null, false); } @@ -196,7 +218,7 @@ public class CarbonIndexFileMergeWriter { * @param readFileFooterFromCarbonDataFile * @throws IOException */ - public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId, + public String mergeCarbonIndexFilesOfSegment(String segmentId, String tablePath, boolean readFileFooterFromCarbonDataFile) throws IOException { return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null, readFileFooterFromCarbonDataFile); http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java index c057129..c352a95 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java @@ -163,16 +163,13 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se UpdateVO invalidBlockVOForSegmentId = null; Boolean isIUDTable = false; - AbsoluteTableIdentifier absoluteTableIdentifier = - getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier(); - SegmentUpdateStatusManager updateStatusManager = - new SegmentUpdateStatusManager(absoluteTableIdentifier); + SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(carbonTable); isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0); // for each segment fetch blocks matching filter in Driver BTree List<CarbonInputSplit> dataBlocksOfSegment = - getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions, + getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions, validSegments, partitionInfo, oldPartitionIdList); numBlocks = dataBlocksOfSegment.size(); for (CarbonInputSplit inputSplit : dataBlocksOfSegment) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 8d2318b..c757ba9 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -319,7 +319,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { * get data blocks of given segment */ protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job, - AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver, + CarbonTable carbonTable, FilterResolverIntf resolver, BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo, List<Integer> oldPartitionIdList) throws IOException { @@ -328,7 +328,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { // get tokens for all the required FileSystem for table path TokenCache.obtainTokensForNamenodes(job.getCredentials(), - new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration()); + new Path[] { new Path(carbonTable.getTablePath()) }, job.getConfiguration()); boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT)); @@ -339,7 +339,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { List<ExtendedBlocklet> prunedBlocklets; if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) { DistributableDataMapFormat datamapDstr = - new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper, segmentIds, + new DistributableDataMapFormat(carbonTable, dataMapExprWrapper, segmentIds, partitionsToPrune, BlockletDataMapFactory.class.getName()); prunedBlocklets = dataMapJob.execute(datamapDstr, resolver); // Apply expression on the blocklets. http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index 4634b06..f573acf 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -117,7 +117,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter { + CarbonCommonConstants.FILE_SEPARATOR + loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp() + ".tmp"; // Merge all partition files into a single file. - String segmentFileName = loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp(); + String segmentFileName = SegmentFileStore.genSegmentFileName( + loadModel.getSegmentId(), String.valueOf(loadModel.getFactTimeStamp())); SegmentFileStore.SegmentFile segmentFile = SegmentFileStore .mergeSegmentFiles(readPath, segmentFileName, CarbonTablePath.getSegmentFilesLocation(loadModel.getTablePath())); http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index c94f777..beb51cb 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -130,12 +130,12 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); LoadMetadataDetails[] loadMetadataDetails = SegmentStatusManager .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath())); - SegmentUpdateStatusManager updateStatusManager = - new SegmentUpdateStatusManager(identifier, loadMetadataDetails); CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); if (null == carbonTable) { throw new IOException("Missing/Corrupt schema file for table."); } + SegmentUpdateStatusManager updateStatusManager = + new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails); List<Segment> invalidSegments = new ArrayList<>(); List<UpdateVO> invalidTimestampsList = new ArrayList<>(); List<Segment> streamSegments = null; @@ -182,7 +182,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { for (SegmentUpdateDetails segmentUpdateDetail : updateStatusManager .getUpdateStatusDetails()) { boolean refreshNeeded = - DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier) + DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable) .isRefreshNeeded(segmentUpdateDetail.getSegmentName(), updateStatusManager); if (refreshNeeded) { toBeCleanedSegments.add(new Segment(segmentUpdateDetail.getSegmentName(), null)); @@ -190,7 +190,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { } // Clean segments if refresh is needed for (Segment segment : filteredSegmentToAccess) { - if (DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier) + if (DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable) .isRefreshNeeded(segment.getSegmentNo())) { toBeCleanedSegments.add(segment); } @@ -370,12 +370,9 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { * @param targetSegment * @param oldPartitionIdList get old partitionId before partitionInfo was changed * @return - * @throws IOException */ public List<InputSplit> getSplitsOfOneSegment(JobContext job, String targetSegment, - List<Integer> oldPartitionIdList, PartitionInfo partitionInfo) - throws IOException { - AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); + List<Integer> oldPartitionIdList, PartitionInfo partitionInfo) { List<Segment> invalidSegments = new ArrayList<>(); List<UpdateVO> invalidTimestampsList = new ArrayList<>(); @@ -383,7 +380,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { segmentList.add(new Segment(targetSegment, null)); setSegmentsToAccess(job.getConfiguration(), segmentList); try { - // process and resolve the expression Expression filter = getFilterPredicates(job.getConfiguration()); CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); @@ -414,7 +410,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { FilterResolverIntf filterInterface = carbonTable.resolveFilter(filter, tableProvider); // do block filtering and get split List<InputSplit> splits = getSplits(job, filterInterface, segmentList, matchedPartitions, - partitionInfo, oldPartitionIdList, new SegmentUpdateStatusManager(identifier)); + partitionInfo, oldPartitionIdList, new SegmentUpdateStatusManager(carbonTable)); // pass the invalid segment to task side in order to remove index entry in task side if (invalidSegments.size() > 0) { for (InputSplit split : splits) { @@ -473,14 +469,11 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { UpdateVO invalidBlockVOForSegmentId = null; Boolean isIUDTable = false; - AbsoluteTableIdentifier absoluteTableIdentifier = - getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier(); - isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0); // for each segment fetch blocks matching filter in Driver BTree List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment = - getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions, + getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions, validSegments, partitionInfo, oldPartitionIdList); numBlocks = dataBlocksOfSegment.size(); for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) { @@ -527,18 +520,15 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { /** * Get the row count of the Block and mapping of segment and Block count. - * - * @param identifier - * @return - * @throws IOException */ - public BlockMappingVO getBlockRowCount(Job job, AbsoluteTableIdentifier identifier, + public BlockMappingVO getBlockRowCount(Job job, CarbonTable table, List<PartitionSpec> partitions) throws IOException { - TableDataMap blockletMap = DataMapStoreManager.getInstance().getDefaultDataMap(identifier); + AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier(); + TableDataMap blockletMap = DataMapStoreManager.getInstance().getDefaultDataMap(table); LoadMetadataDetails[] loadMetadataDetails = SegmentStatusManager .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath())); SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager( - identifier, loadMetadataDetails); + table, loadMetadataDetails); SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments = new SegmentStatusManager(identifier).getValidAndInvalidSegments(loadMetadataDetails); Map<String, Long> blockRowCountMapping = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java index 60c88dc..3f19a3f 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java @@ -29,7 +29,7 @@ import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; @@ -48,7 +48,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl private static final String FILTER_EXP = "mapreduce.input.distributed.datamap.filter"; - private AbsoluteTableIdentifier identifier; + private CarbonTable table; private DataMapExprWrapper dataMapExprWrapper; @@ -58,10 +58,10 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl private List<PartitionSpec> partitions; - DistributableDataMapFormat(AbsoluteTableIdentifier identifier, + DistributableDataMapFormat(CarbonTable table, DataMapExprWrapper dataMapExprWrapper, List<Segment> validSegments, List<PartitionSpec> partitions, String className) { - this.identifier = identifier; + this.table = table; this.dataMapExprWrapper = dataMapExprWrapper; this.validSegments = validSegments; this.className = className; @@ -106,7 +106,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl throws IOException, InterruptedException { DataMapDistributableWrapper distributable = (DataMapDistributableWrapper) inputSplit; TableDataMap dataMap = DataMapStoreManager.getInstance() - .getDataMap(identifier, distributable.getDistributable().getDataMapSchema()); + .getDataMap(table, distributable.getDistributable().getDataMapSchema()); List<ExtendedBlocklet> blocklets = dataMap.prune( distributable.getDistributable(), dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions); http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala index ff421e1..fd97996 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala @@ -52,7 +52,7 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll { sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_merge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_merge") - new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) + new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) assert(getIndexFileCount("default", "carbon_automation_merge", "0") == 0) checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), sql("""Select count(*) from carbon_automation_merge""")) @@ -69,8 +69,8 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll { assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2) assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 2) val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge") - new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) - new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false) + new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) + new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false) assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 0) assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 0) checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows) @@ -91,7 +91,7 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll { assert(getIndexFileCount("default", "carbon_automation_nonmerge", "2") == 2) sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'minor'").collect() val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge") - new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false) + new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false) assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1") == 0) assert(getMergedIndexFileCount("default", "carbon_automation_nonmerge", "0.1") == 1) checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows) http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala index aace3ea..c7912cf 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala @@ -62,7 +62,7 @@ class CarbonIndexFileMergeTestCase sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " + s"'GLOBAL_SORT_PARTITIONS'='100')") val table = CarbonMetadata.getInstance().getCarbonTable("default","indexmerge") - new CarbonIndexFileMergeWriter() + new CarbonIndexFileMergeWriter(table) .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) assert(getIndexFileCount("default_indexmerge", "0") == 0) checkAnswer(sql("""Select count(*) from nonindexmerge"""), @@ -85,9 +85,9 @@ class CarbonIndexFileMergeTestCase assert(getIndexFileCount("default_nonindexmerge", "0") == 100) assert(getIndexFileCount("default_nonindexmerge", "1") == 100) val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge") - new CarbonIndexFileMergeWriter() + new CarbonIndexFileMergeWriter(table) .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) - new CarbonIndexFileMergeWriter() + new CarbonIndexFileMergeWriter(table) .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false) assert(getIndexFileCount("default_nonindexmerge", "0") == 0) assert(getIndexFileCount("default_nonindexmerge", "1") == 0) @@ -110,9 +110,9 @@ class CarbonIndexFileMergeTestCase assert(getIndexFileCount("default_nonindexmerge", "0") == 100) assert(getIndexFileCount("default_nonindexmerge", "1") == 100) val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge") - new CarbonIndexFileMergeWriter() + new CarbonIndexFileMergeWriter(table) .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false) - new CarbonIndexFileMergeWriter() + new CarbonIndexFileMergeWriter(table) .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false) assert(getIndexFileCount("default_nonindexmerge", "0") == 0) assert(getIndexFileCount("default_nonindexmerge", "1") == 0) @@ -139,7 +139,7 @@ class CarbonIndexFileMergeTestCase assert(getIndexFileCount("default_nonindexmerge", "1") == 100) sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect() val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge") - new CarbonIndexFileMergeWriter() + new CarbonIndexFileMergeWriter(table) .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false) assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0) checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows) @@ -168,7 +168,7 @@ class CarbonIndexFileMergeTestCase assert(getIndexFileCount("default_nonindexmerge", "3") == 100) sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect() val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge") - new CarbonIndexFileMergeWriter() + new CarbonIndexFileMergeWriter(table) .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false) assert(getIndexFileCount("default_nonindexmerge", "0") == 100) assert(getIndexFileCount("default_nonindexmerge", "1") == 100) http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala index 510903a..1fbddb0 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala @@ -16,11 +16,15 @@ */ package org.apache.carbondata.spark.testsuite.iud +import java.io.File + import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.{CarbonEnv, Row, SaveMode} import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.path.CarbonTablePath class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { @@ -178,6 +182,8 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("clean files for table select_after_clean") sql("delete from select_after_clean where name='def'") sql("clean files for table select_after_clean") + assertResult(false)(new File( + CarbonTablePath.getSegmentPath(s"$storeLocation/iud_db.db/select_after_clean", "0")).exists()) checkAnswer(sql("""select * from select_after_clean"""), Seq(Row(1, "abc"), Row(3, "uhj"), Row(4, "frg"))) } @@ -198,7 +204,9 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { val files = FileFactory.getCarbonFile(metaPath) val result = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.getClass if(result.getCanonicalName.contains("CarbonFileMetastore")) { - assert(files.listFiles().length == 2) + assert(files.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = !file.isDirectory + }).length == 2) } else assert(files.listFiles().length == 1) http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala index 98c9a16..ec39f66 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala @@ -398,12 +398,12 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("""drop table if exists iud.show_segment""").show sql("""create table iud.show_segment (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""") sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.show_segment""") - val before_update = sql("""show segments for table iud.show_segment""").toDF() + val before_update = sql("""show segments for table iud.show_segment""").collect() sql("""update iud.show_segment d set (d.c3, d.c5 ) = (select s.c33,s.c55 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show() - val after_update = sql("""show segments for table iud.show_segment""").toDF() + val after_update = sql("""show segments for table iud.show_segment""") checkAnswer( - before_update, - after_update + after_update, + before_update ) sql("""drop table if exists iud.show_segment""").show } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index c9237d1..b4c5e4d 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -274,8 +274,9 @@ class CarbonMergerRDD[K, V]( val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from( tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId) ) + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager( - absoluteTableIdentifier) + carbonTable) val jobConf: JobConf = new JobConf(new Configuration) SparkHadoopUtil.get.addCredentials(jobConf) val job: Job = new Job(jobConf) @@ -383,7 +384,6 @@ class CarbonMergerRDD[K, V]( dataFileFooter.getSegmentInfo.getColumnCardinality) } val updatedMaxSegmentColumnList = new util.ArrayList[ColumnSchema]() - val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable // update cardinality and column schema list according to master schema val cardinality = CarbonCompactionUtil .updateColumnSchemaAndGetCardinality(columnToCardinalityMap, http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala index 48ebdb4..37cd12e 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala @@ -28,11 +28,13 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark.sql.execution.command.AlterPartitionModel +import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.block.{SegmentProperties, TableBlockInfo} -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, SegmentFileStore} import org.apache.carbondata.core.metadata.schema.PartitionInfo import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.CarbonInputSplit @@ -203,5 +205,22 @@ object PartitionUtils { files.add(file) } CarbonUtil.deleteFiles(files.asScala.toArray) + if (!files.isEmpty) { + val carbonTable = alterPartitionModel.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val file = SegmentFileStore.writeSegmentFile( + identifier.getTablePath, + alterPartitionModel.segmentId, + alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString) + val segmentFiles = Seq(new Segment(alterPartitionModel.segmentId, file)).asJava + if (!CarbonUpdateUtil.updateTableMetadataStatus( + new util.HashSet[Segment](Seq(new Segment(alterPartitionModel.segmentId, null)).asJava), + carbonTable, + alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString, + true, + new util.ArrayList[Segment](0), + new util.ArrayList[Segment](segmentFiles))) { + throw new IOException("Data update failed due to failure in table status updation.") + } + } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 0d00023..bbe7be0 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -51,7 +51,7 @@ import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo} import org.apache.carbondata.core.dictionary.server.DictionaryServer import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion} +import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion, SegmentFileStore} import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.CarbonTable @@ -291,7 +291,6 @@ object CarbonDataRDDFactory { dataFrame: Option[DataFrame] = None, updateModel: Option[UpdateTableModel] = None, operationContext: OperationContext): Unit = { - val storePath: String = carbonLoadModel.getTablePath LOGGER.audit(s"Data load request has been received for table" + s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") // Check if any load need to be deleted before loading new data @@ -428,6 +427,13 @@ object CarbonDataRDDFactory { segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName, null)) } } + val segmentFiles = segmentDetails.asScala.map{seg => + val file = SegmentFileStore.writeSegmentFile( + carbonTable.getTablePath, + seg.getSegmentNo, + updateModel.get.updatedTimeStamp.toString) + new Segment(seg.getSegmentNo, file) + }.filter(_.getSegmentFileName != null).asJava // this means that the update doesnt have any records to update so no need to do table // status file updation. @@ -441,7 +447,8 @@ object CarbonDataRDDFactory { carbonTable, updateModel.get.updatedTimeStamp + "", true, - new util.ArrayList[Segment](0))) { + new util.ArrayList[Segment](0), + new util.ArrayList[Segment](segmentFiles))) { LOGGER.audit("Data update is successful for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") } else { @@ -499,6 +506,11 @@ object CarbonDataRDDFactory { } writeDictionary(carbonLoadModel, result, writeAll = false) + + val segmentFileName = + SegmentFileStore.writeSegmentFile(carbonTable.getTablePath, carbonLoadModel.getSegmentId, + String.valueOf(carbonLoadModel.getFactTimeStamp)) + operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment", carbonLoadModel.getSegmentId) val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = @@ -513,6 +525,7 @@ object CarbonDataRDDFactory { loadStatus, newEntryLoadStatus, overwriteTable, + segmentFileName, uniqueTableStatusId) val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent = new LoadTablePostStatusUpdateEvent(carbonLoadModel) @@ -788,6 +801,7 @@ object CarbonDataRDDFactory { loadStatus: SegmentStatus, newEntryLoadStatus: SegmentStatus, overwriteTable: Boolean, + segmentFileName: String, uuid: String = ""): Boolean = { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val metadataDetails = if (status != null && status.size > 0 && status(0) != null) { @@ -795,11 +809,12 @@ object CarbonDataRDDFactory { } else { new LoadMetadataDetails } + metadataDetails.setSegmentFile(segmentFileName) CarbonLoaderUtil.populateNewLoadMetaEntry( - metadataDetails, - newEntryLoadStatus, - carbonLoadModel.getFactTimeStamp, - true) + metadataDetails, + newEntryLoadStatus, + carbonLoadModel.getFactTimeStamp, + true) CarbonLoaderUtil .addDataIndexSizeIntoMetaEntry(metadataDetails, carbonLoadModel.getSegmentId, carbonTable) val done = CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails, carbonLoadModel, false,
