Repository: carbondata Updated Branches: refs/heads/master d5bec4dd7 -> 72f50b507
[CARBONDATA-2258] Separate visible and invisible segments info into two files to reduce the size of tablestatus file. The size of the tablestatus file is getting larger, there are many places will scan this file and it will impact the performance of reading this file. According to the discussion on thread, it can append the invisible segment list to the file called 'tablestatus.history' when execute command 'CLEAN FILES FOR TABLE' (in method 'SegmentStatusManager.deleteLoadsAndUpdateMetadata') every time, separate visible and invisible segments into two files(tablestatus file and tablestatus.history file). This closes #2091 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/72f50b50 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/72f50b50 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/72f50b50 Branch: refs/heads/master Commit: 72f50b507dcc405e41904bbddada1e51976d0669 Parents: d5bec4d Author: Zhang Zhichao <[email protected]> Authored: Fri Mar 23 01:39:36 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Sun Mar 25 00:01:53 2018 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 18 ++- .../statusmanager/SegmentStatusManager.java | 138 ++++++++++++++++++- .../carbondata/core/util/CarbonProperties.java | 27 +++- .../core/util/path/CarbonTablePath.java | 9 ++ .../MajorCompactionIgnoreInMinorTest.scala | 66 ++++----- .../apache/spark/util/CarbonCommandSuite.scala | 25 ++++ 6 files changed, 240 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/72f50b50/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 33a1884..45d15fe 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1602,8 +1602,22 @@ public final class CarbonCommonConstants { // default value is 2 days public static final String CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT = "48"; + /** + * The number of invisible segment info which will be preserved in tablestatus file, + * if it exceeds this value, they will be removed and write to tablestatus.history file. + */ + @CarbonProperty + public static final String CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT = + "carbon.invisible.segments.preserve.count"; + + /** + * default value is 200, it means that it will preserve 200 invisible segment info + * in tablestatus file. + * The size of one segment info is about 500 bytes, so the size of tablestatus file + * will remain at 100KB. + */ + public static final String CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT_DEFAULT = "200"; + private CarbonCommonConstants() { } } - - http://git-wip-us.apache.org/repos/asf/carbondata/blob/72f50b50/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index aa73fee..06cf76f 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -47,6 +47,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DeleteLoadFolders; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -201,6 +202,22 @@ public class SegmentStatusManager { return readTableStatusFile(tableStatusFileName); } + /** + * This method reads the load history metadata file + * + * @param metadataFolderPath + * @return + */ + public static LoadMetadataDetails[] readLoadHistoryMetadata(String metadataFolderPath) { + String metadataFileName = metadataFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + CarbonTablePath.TABLE_STATUS_HISTORY_FILE; + try { + return readTableStatusFile(metadataFileName); + } catch (IOException e) { + return new LoadMetadataDetails[0]; + } + } + public static LoadMetadataDetails[] readTableStatusFile(String tableStatusPath) throws IOException { Gson gsonObjectToRead = new Gson(); @@ -884,11 +901,33 @@ public class SegmentStatusManager { LoadMetadataDetails[] latestMetadata = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); - // update the metadata details from old to new status. - List<LoadMetadataDetails> latestStatus = - updateLoadMetadataFromOldToNew(tuple2.details, latestMetadata); - - writeLoadMetadata(identifier, latestStatus); + int invisibleSegmentPreserveCnt = + CarbonProperties.getInstance().getInvisibleSegmentPreserveCount(); + int invisibleSegmentCnt = SegmentStatusManager.countInvisibleSegments(tuple2.details); + // if execute command 'clean files' or the number of invisible segment info + // exceeds the value of 'carbon.invisible.segments.preserve.count', + // it need to append the invisible segment list to 'tablestatus.history' file. + if (isForceDeletion || (invisibleSegmentCnt > invisibleSegmentPreserveCnt)) { + TableStatusReturnTuple tableStatusReturn = separateVisibleAndInvisibleSegments( + tuple2.details, latestMetadata, invisibleSegmentCnt); + LoadMetadataDetails[] oldLoadHistoryList = readLoadHistoryMetadata( + carbonTable.getMetadataPath()); + LoadMetadataDetails[] newLoadHistoryList = appendLoadHistoryList( + oldLoadHistoryList, tableStatusReturn.arrayOfLoadHistoryDetails); + writeLoadDetailsIntoFile( + CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()), + tableStatusReturn.arrayOfLoadDetails); + writeLoadDetailsIntoFile( + CarbonTablePath.getTableStatusHistoryFilePath(carbonTable.getTablePath()), + newLoadHistoryList); + } else { + // update the metadata details from old to new status. + List<LoadMetadataDetails> latestStatus = + updateLoadMetadataFromOldToNew(tuple2.details, latestMetadata); + writeLoadMetadata(identifier, latestStatus); + } + DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion( + identifier, carbonTable.getMetadataPath(), isForceDeletion, partitionSpecs); } else { String dbName = identifier.getCarbonTableIdentifier().getDatabaseName(); String tableName = identifier.getCarbonTableIdentifier().getTableName(); @@ -900,8 +939,6 @@ public class SegmentStatusManager { LOG.error(errorMsg); throw new IOException(errorMsg + " Please try after some time."); } - DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion( - identifier, carbonTable.getMetadataPath(), isForceDeletion, partitionSpecs); } finally { if (locked) { CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); @@ -913,4 +950,91 @@ public class SegmentStatusManager { CarbonLockUtil.deleteExpiredSegmentLockFiles(carbonTable); } + /** + * Get the number of invisible segment info from segment info list. + */ + public static int countInvisibleSegments(LoadMetadataDetails[] segmentList) { + int invisibleSegmentCnt = 0; + if (segmentList.length != 0) { + for (LoadMetadataDetails eachSeg : segmentList) { + // can not remove segment 0, there are some info will be used later + // for example: updateStatusFileName + if (!eachSeg.getLoadName().equalsIgnoreCase("0") + && eachSeg.getVisibility().equalsIgnoreCase("false")) { + invisibleSegmentCnt += 1; + } + } + } + return invisibleSegmentCnt; + } + + private static class TableStatusReturnTuple { + LoadMetadataDetails[] arrayOfLoadDetails; + LoadMetadataDetails[] arrayOfLoadHistoryDetails; + TableStatusReturnTuple(LoadMetadataDetails[] arrayOfLoadDetails, + LoadMetadataDetails[] arrayOfLoadHistoryDetails) { + this.arrayOfLoadDetails = arrayOfLoadDetails; + this.arrayOfLoadHistoryDetails = arrayOfLoadHistoryDetails; + } + } + + /** + * Separate visible and invisible segments into two array. + */ + public static TableStatusReturnTuple separateVisibleAndInvisibleSegments( + LoadMetadataDetails[] oldList, + LoadMetadataDetails[] newList, + int invisibleSegmentCnt) { + int newSegmentsLength = newList.length; + int visibleSegmentCnt = newSegmentsLength - invisibleSegmentCnt; + LoadMetadataDetails[] arrayOfVisibleSegments = new LoadMetadataDetails[visibleSegmentCnt]; + LoadMetadataDetails[] arrayOfInvisibleSegments = new LoadMetadataDetails[invisibleSegmentCnt]; + int oldSegmentsLength = oldList.length; + int visibleIdx = 0; + int invisibleIdx = 0; + for (int i = 0; i < newSegmentsLength; i++) { + LoadMetadataDetails newSegment = newList[i]; + if (i < oldSegmentsLength) { + LoadMetadataDetails oldSegment = oldList[i]; + if (newSegment.getLoadName().equalsIgnoreCase("0")) { + newSegment.setVisibility(oldSegment.getVisibility()); + arrayOfVisibleSegments[visibleIdx] = newSegment; + visibleIdx++; + } else if ("false".equalsIgnoreCase(oldSegment.getVisibility())) { + newSegment.setVisibility("false"); + arrayOfInvisibleSegments[invisibleIdx] = newSegment; + invisibleIdx++; + } else { + arrayOfVisibleSegments[visibleIdx] = newSegment; + visibleIdx++; + } + } else { + arrayOfVisibleSegments[visibleIdx] = newSegment; + visibleIdx++; + } + } + return new TableStatusReturnTuple(arrayOfVisibleSegments, arrayOfInvisibleSegments); + } + + /** + * Return an array containing all invisible segment entries in appendList and historyList. + */ + public static LoadMetadataDetails[] appendLoadHistoryList( + LoadMetadataDetails[] historyList, + LoadMetadataDetails[] appendList) { + int historyListLen = historyList.length; + int appendListLen = appendList.length; + int newListLen = historyListLen + appendListLen; + LoadMetadataDetails[] newList = new LoadMetadataDetails[newListLen]; + int newListIdx = 0; + for (int i = 0; i < historyListLen; i++) { + newList[newListIdx] = historyList[i]; + newListIdx++; + } + for (int i = 0; i < appendListLen; i++) { + newList[newListIdx] = appendList[i]; + newListIdx++; + } + return newList; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/72f50b50/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 4d8caed..7ed2b0f 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -1399,7 +1399,7 @@ public final class CarbonProperties { } catch (NumberFormatException exc) { LOGGER.error( "The heap memory pooling threshold bytes is invalid. Using the default value " - + CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT); + + CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT); thresholdSize = Integer.parseInt( CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT); } @@ -1419,11 +1419,32 @@ public final class CarbonProperties { preserveSeconds = preserveHours * 3600 * 1000L; } catch (NumberFormatException exc) { LOGGER.error( - "The segment lock files preserv hours is invalid. Using the default value " - + CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT); + "The value of '" + CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS + + "' is invalid. Using the default value " + + CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT); preserveSeconds = Integer.parseInt( CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT) * 3600 * 1000L; } return preserveSeconds; } + + /** + * Get the number of invisible segment info which will be preserved in tablestatus file. + */ + public int getInvisibleSegmentPreserveCount() { + int preserveCnt; + try { + preserveCnt = Integer.parseInt(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT, + CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT_DEFAULT)); + } catch (NumberFormatException exc) { + LOGGER.error( + "The value of '" + CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT + + "' is invalid. Using the default value " + + CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT_DEFAULT); + preserveCnt = Integer.parseInt( + CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT_DEFAULT); + } + return preserveCnt; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/72f50b50/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index a37d5cd..aee18da 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -45,6 +45,7 @@ public class CarbonTablePath { private static final String LOCK_DIR = "LockFiles"; public static final String TABLE_STATUS_FILE = "tablestatus"; + public static final String TABLE_STATUS_HISTORY_FILE = "tablestatus.history"; public static final String CARBON_DATA_EXT = ".carbondata"; public static final String INDEX_FILE_EXT = ".carbonindex"; public static final String MERGE_INDEX_FILE_EXT = ".carbonindexmerge"; @@ -663,4 +664,12 @@ public class CarbonTablePath { public static boolean isSegmentLockFilePath(String lockFileName) { return lockFileName.startsWith(SEGMENT_PREFIX) && lockFileName.endsWith(LockUsage.LOCK); } + + /** + * Return table status history file path based on `tablePath` + */ + public static String getTableStatusHistoryFilePath(String tablePath) { + return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + + TABLE_STATUS_HISTORY_FILE; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/72f50b50/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala index 9afb890..5ab156c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala @@ -40,6 +40,9 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll val csvFilePath3 = s"$resourcesPath/compaction/compaction3.csv" override def beforeAll { + } + + def createTableAndLoadData(): Unit = { CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2") sql("drop table if exists ignoremajor") CarbonProperties.getInstance() @@ -67,43 +70,13 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll ) sql("alter table ignoremajor compact 'minor'" ) - - } - - /** - * Test whether major compaction is not included in minor compaction. - */ - test("delete merged folder and check segments") { - // delete merged segments - sql("clean files for table ignoremajor") - - val carbonTable = CarbonMetadata.getInstance().getCarbonTable( - CarbonCommonConstants.DATABASE_DEFAULT_NAME, - "ignoremajor" - ) - val absoluteTableIdentifier = carbonTable - .getAbsoluteTableIdentifier - val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager( - absoluteTableIdentifier) - - // merged segment should not be there - val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.map(_.getSegmentNo).toList - assert(segments.contains("0.1")) - assert(segments.contains("2.1")) - assert(!segments.contains("2")) - assert(!segments.contains("3")) - val cacheClient = new CacheClient() - val segmentIdentifier = new TableSegmentUniqueIdentifier(absoluteTableIdentifier, "2") - val wrapper: SegmentTaskIndexWrapper = cacheClient.getSegmentAccessClient. - getIfPresent(segmentIdentifier) - assert(null == wrapper) - } /** * Delete should not work on compacted segment. */ test("delete compacted segment and check status") { + createTableAndLoadData() intercept[Throwable] { sql("delete from table ignoremajor where segment.id in (2)") } @@ -124,6 +97,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll * Delete should not work on compacted segment. */ test("delete compacted segment by date and check status") { + createTableAndLoadData() sql( "delete from table ignoremajor where segment.starttime before " + " '2222-01-01 19:35:01'" @@ -142,6 +116,36 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll } /** + * Test whether major compaction is not included in minor compaction. + */ + test("delete merged folder and check segments") { + createTableAndLoadData() + // delete merged segments + sql("clean files for table ignoremajor") + + val carbonTable = CarbonMetadata.getInstance().getCarbonTable( + CarbonCommonConstants.DATABASE_DEFAULT_NAME, + "ignoremajor" + ) + val absoluteTableIdentifier = carbonTable + .getAbsoluteTableIdentifier + val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager( + absoluteTableIdentifier) + + // merged segment should not be there + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.map(_.getSegmentNo).toList + assert(segments.contains("0.1")) + assert(segments.contains("2.1")) + assert(!segments.contains("2")) + assert(!segments.contains("3")) + val cacheClient = new CacheClient() + val segmentIdentifier = new TableSegmentUniqueIdentifier(absoluteTableIdentifier, "2") + val wrapper: SegmentTaskIndexWrapper = cacheClient.getSegmentAccessClient. + getIfPresent(segmentIdentifier) + assert(null == wrapper) + } + + /** * Execute two major compactions sequentially */ test("Execute two major compactions sequentially") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/72f50b50/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala index 8ff6cab..7dd6c7e 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala @@ -28,6 +28,8 @@ import org.apache.carbondata.api.CarbonStore import org.apache.carbondata.common.constants.LoggerAction import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails +import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonProperties class CarbonCommandSuite extends Spark2QueryTest with BeforeAndAfterAll { @@ -165,6 +167,29 @@ class CarbonCommandSuite extends Spark2QueryTest with BeforeAndAfterAll { dropTable("preagg1") } + test("separate visible and invisible segments info into two files") { + val tableName = "test_tablestatus_history" + sql(s"drop table if exists ${tableName}") + sql(s"create table ${tableName} (name String, age int) stored by 'carbondata' " + + "TBLPROPERTIES('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,2')") + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName) + sql(s"insert into ${tableName} select 'abc1',1") + sql(s"insert into ${tableName} select 'abc2',2") + sql(s"insert into ${tableName} select 'abc3',3") + assert(sql(s"show segments for table ${tableName}").collect().length == 4) + var detail = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + var historyDetail = SegmentStatusManager.readLoadHistoryMetadata(carbonTable.getMetadataPath) + assert(detail.length == 4) + assert(historyDetail.length == 0) + sql(s"clean files for table ${tableName}") + assert(sql(s"show segments for table ${tableName}").collect().length == 2) + detail = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + historyDetail = SegmentStatusManager.readLoadHistoryMetadata(carbonTable.getMetadataPath) + assert(detail.length == 3) + assert(historyDetail.length == 1) + dropTable(tableName) + } + protected def dropTable(tableName: String): Unit ={ sql(s"DROP TABLE IF EXISTS $tableName") }
