[CARBONDATA-1626]add data size and index size in table status file This closes #1435
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/589f126d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/589f126d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/589f126d Branch: refs/heads/fgdatamap Commit: 589f126dea872f54c2096c9572436bf10589b1ca Parents: f22e614 Author: akashrn5 <akashnilu...@gmail.com> Authored: Wed Oct 25 15:27:37 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Fri Nov 17 21:43:15 2017 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 26 +++ .../core/datastore/impl/FileFactory.java | 2 +- .../core/statusmanager/LoadMetadataDetails.java | 18 ++ .../apache/carbondata/core/util/CarbonUtil.java | 152 ++++++++++++++++ .../core/util/path/CarbonTablePath.java | 8 + .../spark/rdd/CarbonDataRDDFactory.scala | 11 +- .../CarbonDescribeFormattedCommand.scala | 10 ++ .../spark/sql/GetDataSizeAndIndexSizeTest.scala | 172 +++++++++++++++++++ .../processing/merger/CarbonDataMergerUtil.java | 7 +- 9 files changed, 398 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 0a7dfdd..762ef6d 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1380,6 +1380,32 @@ public final class CarbonCommonConstants { public static final String AGGREGATIONDATAMAPSCHEMA = "AggregateDataMapHandler"; + /* + * The total size of carbon data + */ + public static final String CARBON_TOTAL_DATA_SIZE = "datasize"; + + /** + * The total size of carbon index + */ + public static final String CARBON_TOTAL_INDEX_SIZE = "indexsize"; + + /** + * ENABLE_CALCULATE_DATA_INDEX_SIZE + */ + @CarbonProperty public static final String ENABLE_CALCULATE_SIZE = "carbon.enable.calculate.size"; + + /** + * DEFAULT_ENABLE_CALCULATE_DATA_INDEX_SIZE + */ + @CarbonProperty public static final String DEFAULT_ENABLE_CALCULATE_SIZE = "true"; + + public static final String TABLE_DATA_SIZE = "Table Data Size"; + + public static final String TABLE_INDEX_SIZE = "Table Index Size"; + + public static final String LAST_UPDATE_TIME = "Last Update Time"; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java index 57a48ec..240253d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java @@ -541,7 +541,7 @@ public final class FileFactory { * @param fileType * @return updated file path without url for local */ - private static String getUpdatedFilePath(String filePath, FileType fileType) { + public static String getUpdatedFilePath(String filePath, FileType fileType) { switch (fileType) { case HDFS: case ALLUXIO: http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java index d838e2e..b282d53 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java @@ -41,6 +41,24 @@ public class LoadMetadataDetails implements Serializable { private String partitionCount; private String isDeleted = CarbonCommonConstants.KEYWORD_FALSE; + private String dataSize; + private String indexSize; + + public String getDataSize() { + return dataSize; + } + + public void setDataSize(String dataSize) { + this.dataSize = dataSize; + } + + public String getIndexSize() { + return indexSize; + } + + public void setIndexSize(String indexSize) { + this.indexSize = indexSize; + } // update delta end timestamp private String updateDeltaEndTimestamp = ""; http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 3c177dc..9d6acb6 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -58,6 +58,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.exception.InvalidConfigurationException; import org.apache.carbondata.core.indexstore.BlockletDetailInfo; import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor; +import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; @@ -67,6 +68,7 @@ import org.apache.carbondata.core.metadata.blocklet.SegmentInfo; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; @@ -77,6 +79,9 @@ import org.apache.carbondata.core.reader.ThriftReader.TBaseCreator; import org.apache.carbondata.core.scan.model.QueryDimension; import org.apache.carbondata.core.service.CarbonCommonFactory; import org.apache.carbondata.core.service.PathService; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -86,8 +91,11 @@ import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.format.DataChunk3; import com.google.gson.Gson; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.TBase; @@ -2140,5 +2148,149 @@ public final class CarbonUtil { return parentPath.toString() + CarbonCommonConstants.FILE_SEPARATOR + carbonTableIdentifier .getTableName(); } + + /* + * This method will add data size and index size into tablestatus for each segment + */ + public static void addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails, + String segmentId, CarbonTable carbonTable) throws IOException { + CarbonTablePath carbonTablePath = + CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier())); + Map<String, Long> dataIndexSize = + CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, segmentId); + loadMetadataDetails + .setDataSize(dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE).toString()); + loadMetadataDetails + .setIndexSize(dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE).toString()); + } + + /** + * This method will calculate the data size and index size for carbon table + */ + public static Map<String, Long> calculateDataIndexSize(CarbonTable carbonTable) + throws IOException { + Map<String, Long> dataIndexSizeMap = new HashMap<String, Long>(); + long dataSize = 0L; + long indexSize = 0L; + long lastUpdateTime = 0L; + boolean needUpdate = false; + AbsoluteTableIdentifier absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier(); + CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); + String isCalculated = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.ENABLE_CALCULATE_SIZE, + CarbonCommonConstants.DEFAULT_ENABLE_CALCULATE_SIZE); + if (isCalculated.equalsIgnoreCase("true")) { + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); + try { + if (carbonLock.lockWithRetries()) { + LOGGER.info("Acquired lock for table for table status updation"); + String metadataPath = carbonTable.getMetaDataFilepath(); + LoadMetadataDetails[] loadMetadataDetails = + SegmentStatusManager.readLoadMetadata(metadataPath); + + for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) { + SegmentStatus loadStatus = loadMetadataDetail.getSegmentStatus(); + if (loadStatus == SegmentStatus.SUCCESS || loadStatus == + SegmentStatus.LOAD_PARTIAL_SUCCESS) { + String dsize = loadMetadataDetail.getDataSize(); + String isize = loadMetadataDetail.getIndexSize(); + // If it is old segment, need to calculate data size and index size again + if (null == dsize || null == isize) { + needUpdate = true; + LOGGER.info("It is an old segment, need calculate data size and index size again"); + HashMap<String, Long> map = CarbonUtil + .getDataSizeAndIndexSize(carbonTablePath, loadMetadataDetail.getLoadName()); + dsize = String.valueOf(map.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE)); + isize = String.valueOf(map.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE)); + loadMetadataDetail.setDataSize(dsize); + loadMetadataDetail.setIndexSize(isize); + } + dataSize += Long.parseLong(dsize); + indexSize += Long.parseLong(isize); + } + } + // If it contains old segment, write new load details + if (needUpdate) { + SegmentStatusManager.writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), + loadMetadataDetails); + } + String tableStatusPath = carbonTablePath.getTableStatusFilePath(); + if (FileFactory.isFileExist(tableStatusPath, FileFactory.getFileType(tableStatusPath))) { + lastUpdateTime = + FileFactory.getCarbonFile(tableStatusPath, FileFactory.getFileType(tableStatusPath)) + .getLastModifiedTime(); + } + dataIndexSizeMap + .put(String.valueOf(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE), dataSize); + dataIndexSizeMap + .put(String.valueOf(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE), indexSize); + dataIndexSizeMap + .put(String.valueOf(CarbonCommonConstants.LAST_UPDATE_TIME), lastUpdateTime); + } else { + LOGGER.error("Not able to acquire the lock for Table status updation for table"); + } + } finally { + if (carbonLock.unlock()) { + LOGGER.info("Table unlocked successfully after table status updation"); + } else { + LOGGER.error("Unable to unlock Table lock for table during table status updation"); + } + } + } + return dataIndexSizeMap; + } + + // Get the total size of carbon data and the total size of carbon index + public static HashMap<String, Long> getDataSizeAndIndexSize(CarbonTablePath carbonTablePath, + String segmentId) throws IOException { + long carbonDataSize = 0L; + long carbonIndexSize = 0L; + HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>(); + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId); + FileFactory.FileType fileType = FileFactory.getFileType(segmentPath); + switch (fileType) { + case HDFS: + case ALLUXIO: + case VIEWFS: + case S3: + Path path = new Path(segmentPath); + FileSystem fs = path.getFileSystem(FileFactory.getConfiguration()); + FileStatus[] fileStatuses = fs.listStatus(path); + if (null != fileStatuses) { + for (FileStatus dataAndIndexStatus : fileStatuses) { + String pathName = dataAndIndexStatus.getPath().getName(); + if (pathName.endsWith(CarbonTablePath.getCarbonIndexExtension()) || pathName + .endsWith(CarbonTablePath.getCarbonMergeIndexExtension())) { + carbonIndexSize += dataAndIndexStatus.getLen(); + } else if (pathName.endsWith(CarbonTablePath.getCarbonDataExtension())) { + carbonDataSize += dataAndIndexStatus.getLen(); + } + } + } + break; + case LOCAL: + default: + segmentPath = FileFactory.getUpdatedFilePath(segmentPath, fileType); + File file = new File(segmentPath); + File[] segmentFiles = file.listFiles(); + if (null != segmentFiles) { + for (File dataAndIndexFile : segmentFiles) { + if (dataAndIndexFile.getCanonicalPath() + .endsWith(CarbonTablePath.getCarbonIndexExtension()) || dataAndIndexFile + .getCanonicalPath().endsWith(CarbonTablePath.getCarbonMergeIndexExtension())) { + carbonIndexSize += FileUtils.sizeOf(dataAndIndexFile); + } else if (dataAndIndexFile.getCanonicalPath() + .endsWith(CarbonTablePath.getCarbonDataExtension())) { + carbonDataSize += FileUtils.sizeOf(dataAndIndexFile); + } + } + } + } + dataAndIndexSize.put(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE, carbonDataSize); + dataAndIndexSize.put(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE, carbonIndexSize); + return dataAndIndexSize; + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index d363ac3..376a71f 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -681,6 +681,14 @@ public class CarbonTablePath extends Path { } /** + * + * @return carbon index merge file extension + */ + public static String getCarbonMergeIndexExtension() { + return MERGE_INDEX_FILE_EXT; + } + + /** * This method will remove strings in path and return short block id * * @param blockId http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index e32c407..7dad243 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -55,11 +55,8 @@ import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.events.{LoadTablePostExecutionEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.FailureCauses -import org.apache.carbondata.processing.loading.csvinput.BlockDetails -import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat -import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable -import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException -import org.apache.carbondata.processing.loading.exception.NoRetryException +import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable} +import org.apache.carbondata.processing.loading.exception.{CarbonDataLoadingException, NoRetryException} import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.loading.sort.SortScopeOptions import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} @@ -292,6 +289,7 @@ object CarbonDataRDDFactory { var executorMessage: String = "" val isSortTable = carbonTable.getNumberOfSortColumns > 0 val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope) + try { if (updateModel.isDefined) { res = loadDataFrameForUpdate( @@ -748,6 +746,7 @@ object CarbonDataRDDFactory { loadStatus: SegmentStatus, overwriteTable: Boolean ): Boolean = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val metadataDetails = if (status != null && status(0) != null) { status(0)._2._1 } else { @@ -758,6 +757,8 @@ object CarbonDataRDDFactory { loadStatus, carbonLoadModel.getFactTimeStamp, true) + CarbonUtil + .addDataIndexSizeIntoMetaEntry(metadataDetails, carbonLoadModel.getSegmentId, carbonTable) val done = CarbonLoaderUtil.recordLoadMetadata(metadataDetails, carbonLoadModel, false, overwriteTable) if (!done) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala index b233c99..b61078b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala @@ -30,6 +30,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.CarbonUtil private[sql] case class CarbonDescribeFormattedCommand( child: SparkPlan, @@ -112,6 +113,15 @@ private[sql] case class CarbonDescribeFormattedCommand( .getOrDefault(CarbonCommonConstants.TABLE_COMMENT, "") results ++= Seq(("Comment: ", tableComment, "")) results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", "")) + val dataIndexSize = CarbonUtil.calculateDataIndexSize(carbonTable) + if (!dataIndexSize.isEmpty) { + results ++= Seq((CarbonCommonConstants.TABLE_DATA_SIZE + ":", + dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE).toString, "")) + results ++= Seq((CarbonCommonConstants.TABLE_INDEX_SIZE + ":", + dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE).toString, "")) + results ++= Seq((CarbonCommonConstants.LAST_UPDATE_TIME + ":", + dataIndexSize.get(CarbonCommonConstants.LAST_UPDATE_TIME).toString, "")) + } results ++= Seq(("SORT_SCOPE", carbonTable.getTableInfo.getFactTable .getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants .LOAD_SORT_SCOPE_DEFAULT), CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/integration/spark2/src/test/scala/org/apache/spark/sql/GetDataSizeAndIndexSizeTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/GetDataSizeAndIndexSizeTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/GetDataSizeAndIndexSizeTest.scala new file mode 100644 index 0000000..03ec3a1 --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/spark/sql/GetDataSizeAndIndexSizeTest.scala @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.test.util.QueryTest +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.scalatest.BeforeAndAfterAll + +class GetDataSizeAndIndexSizeTest extends QueryTest with BeforeAndAfterAll { + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS tableSize1") + sql("DROP TABLE IF EXISTS tableSize2") + sql("DROP TABLE IF EXISTS tableSize3") + sql("DROP TABLE IF EXISTS tableSize4") + sql("DROP TABLE IF EXISTS tableSize5") + sql("DROP TABLE IF EXISTS tableSize6") + sql("DROP TABLE IF EXISTS tableSize7") + sql("DROP TABLE IF EXISTS tableSize8") + sql("DROP TABLE IF EXISTS tableSize9") + sql("DROP TABLE IF EXISTS tableSize10") + sql("DROP TABLE IF EXISTS tableSize11") + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS tableSize1") + sql("DROP TABLE IF EXISTS tableSize2") + sql("DROP TABLE IF EXISTS tableSize3") + sql("DROP TABLE IF EXISTS tableSize4") + sql("DROP TABLE IF EXISTS tableSize5") + sql("DROP TABLE IF EXISTS tableSize6") + sql("DROP TABLE IF EXISTS tableSize7") + sql("DROP TABLE IF EXISTS tableSize8") + sql("DROP TABLE IF EXISTS tableSize9") + sql("DROP TABLE IF EXISTS tableSize10") + sql("DROP TABLE IF EXISTS tableSize11") + } + + test("get data size and index size after load data") { + sql("CREATE TABLE tableSize1 (empno int, workgroupcategory string, deptno int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE tableSize1 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + checkExistence(sql("DESCRIBE FORMATTED tableSize1"), true, CarbonCommonConstants.TABLE_DATA_SIZE) + checkExistence(sql("DESCRIBE FORMATTED tableSize1"), true, CarbonCommonConstants.TABLE_INDEX_SIZE) + val res1 = sql("DESCRIBE FORMATTED tableSize1").collect() + .filter(row => row.getString(0).contains(CarbonCommonConstants.TABLE_DATA_SIZE) || + row.getString(0).contains(CarbonCommonConstants.TABLE_INDEX_SIZE)) + assert(res1.length == 2) + res1.foreach(row => assert(row.getString(1).trim.toLong > 0)) + } + + test("get data size and index size after major compaction") { + sql("CREATE TABLE tableSize2 (empno int, workgroupcategory string, deptno int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE tableSize2 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE tableSize2 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + sql("ALTER TABLE tableSize2 compact 'major'") + checkExistence(sql("DESCRIBE FORMATTED tableSize2"), true, CarbonCommonConstants.TABLE_DATA_SIZE) + checkExistence(sql("DESCRIBE FORMATTED tableSize2"), true, CarbonCommonConstants.TABLE_INDEX_SIZE) + val res2 = sql("DESCRIBE FORMATTED tableSize2").collect() + .filter(row => row.getString(0).contains(CarbonCommonConstants.TABLE_DATA_SIZE) || + row.getString(0).contains(CarbonCommonConstants.TABLE_INDEX_SIZE)) + assert(res2.length == 2) + res2.foreach(row => assert(row.getString(1).trim.toLong > 0)) + } + + test("get data size and index size after minor compaction") { + sql("CREATE TABLE tableSize3 (empno int, workgroupcategory string, deptno int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + sql("ALTER TABLE tableSize3 compact 'minor'") + checkExistence(sql("DESCRIBE FORMATTED tableSize3"), true, CarbonCommonConstants.TABLE_DATA_SIZE) + checkExistence(sql("DESCRIBE FORMATTED tableSize3"), true, CarbonCommonConstants.TABLE_INDEX_SIZE) + val res3 = sql("DESCRIBE FORMATTED tableSize3").collect() + .filter(row => row.getString(0).contains(CarbonCommonConstants.TABLE_DATA_SIZE) || + row.getString(0).contains(CarbonCommonConstants.TABLE_INDEX_SIZE)) + assert(res3.length == 2) + res3.foreach(row => assert(row.getString(1).trim.toLong > 0)) + } + + test("get data size and index size after insert into") { + sql("CREATE TABLE tableSize4 (empno int, workgroupcategory string, deptno int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE tableSize4 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + sql("CREATE TABLE tableSize5 (empno int, workgroupcategory string, deptno int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'") + sql("INSERT INTO TABLE tableSize5 SELECT * FROM tableSize4") + checkExistence(sql("DESCRIBE FORMATTED tableSize5"), true, CarbonCommonConstants.TABLE_DATA_SIZE) + checkExistence(sql("DESCRIBE FORMATTED tableSize5"), true, CarbonCommonConstants.TABLE_INDEX_SIZE) + val res4 = sql("DESCRIBE FORMATTED tableSize5").collect() + .filter(row => row.getString(0).contains(CarbonCommonConstants.TABLE_DATA_SIZE) || + row.getString(0).contains(CarbonCommonConstants.TABLE_INDEX_SIZE)) + assert(res4.length == 2) + res4.foreach(row => assert(row.getString(1).trim.toLong > 0)) + } + + test("get data size and index size after insert overwrite") { + sql("CREATE TABLE tableSize6 (empno int, workgroupcategory string, deptno int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE tableSize6 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + sql("CREATE TABLE tableSize7 (empno int, workgroupcategory string, deptno int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'") + sql("INSERT OVERWRITE TABLE tableSize7 SELECT * FROM tableSize6") + checkExistence(sql("DESCRIBE FORMATTED tableSize7"), true, CarbonCommonConstants.TABLE_DATA_SIZE) + checkExistence(sql("DESCRIBE FORMATTED tableSize7"), true, CarbonCommonConstants.TABLE_INDEX_SIZE) + val res5 = sql("DESCRIBE FORMATTED tableSize7").collect() + .filter(row => row.getString(0).contains(CarbonCommonConstants.TABLE_DATA_SIZE) || + row.getString(0).contains(CarbonCommonConstants.TABLE_INDEX_SIZE)) + assert(res5.length == 2) + res5.foreach(row => assert(row.getString(1).trim.toLong > 0)) + } + + test("get data size and index size for empty table") { + sql("CREATE TABLE tableSize8 (empno int, workgroupcategory string, deptno int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'") + val res6 = sql("DESCRIBE FORMATTED tableSize8").collect() + .filter(row => row.getString(0).contains(CarbonCommonConstants.TABLE_DATA_SIZE) || + row.getString(0).contains(CarbonCommonConstants.TABLE_INDEX_SIZE)) + assert(res6.length == 2) + res6.foreach(row => assert(row.getString(1).trim.toLong == 0)) + } + + test("get last update time for empty table") { + sql("CREATE TABLE tableSize9 (empno int, workgroupcategory string, deptno int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'") + val res7 = sql("DESCRIBE FORMATTED tableSize9").collect() + .filter(row => row.getString(0).contains(CarbonCommonConstants.LAST_UPDATE_TIME)) + assert(res7.length == 1) + res7.foreach(row => assert(row.getString(1).trim.toLong == 0)) + } + + test("get last update time for unempty table") { + sql("CREATE TABLE tableSize10 (empno int, workgroupcategory string, deptno int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE tableSize10 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""") + + val res8 = sql("DESCRIBE FORMATTED tableSize10").collect() + .filter(row => row.getString(0).contains(CarbonCommonConstants.LAST_UPDATE_TIME)) + assert(res8.length == 1) + res8.foreach(row => assert(row.getString(1).trim.toLong > 0)) + } + + test("index and datasize for update scenario") { + sql( + "CREATE TABLE tableSize11 (empno int, workgroupcategory string, deptno int, projectcode " + + "int, attendance int) STORED BY 'org.apache.carbondata.format'") + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE tableSize11 OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""".stripMargin) + val res9 = sql("DESCRIBE FORMATTED tableSize11").collect() + .filter(row => row.getString(0).contains(CarbonCommonConstants.TABLE_DATA_SIZE) || + row.getString(0).contains(CarbonCommonConstants.TABLE_INDEX_SIZE)) + assert(res9.length == 2) + res9.foreach(row => assert(row.getString(1).trim.toLong > 0)) + sql("update tableSize11 set (empno) = (234)").show() + val res10 = sql("DESCRIBE FORMATTED tableSize11").collect() + .filter(row => row.getString(0).contains(CarbonCommonConstants.TABLE_DATA_SIZE) || + row.getString(0).contains(CarbonCommonConstants.TABLE_INDEX_SIZE)) + assert(res10.length == 2) + res10.foreach(row => assert(row.getString(1).trim.toLong > 0)) + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index 8f6d19c..15ee4fb 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -50,6 +50,7 @@ import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl; @@ -281,8 +282,7 @@ public final class CarbonDataMergerUtil { */ public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge, String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel carbonLoadModel, - long mergeLoadStartTime, CompactionType compactionType) { - + long mergeLoadStartTime, CompactionType compactionType) throws IOException { boolean tableStatusUpdationStatus = false; AbsoluteTableIdentifier absoluteTableIdentifier = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); @@ -325,7 +325,10 @@ public final class CarbonDataMergerUtil { loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS); long loadEnddate = CarbonUpdateUtil.readCurrentTime(); loadMetadataDetails.setLoadEndTime(loadEnddate); + CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(); loadMetadataDetails.setLoadName(mergedLoadNumber); + CarbonUtil + .addDataIndexSizeIntoMetaEntry(loadMetadataDetails, mergedLoadNumber, carbonTable); loadMetadataDetails.setLoadStartTime(mergeLoadStartTime); loadMetadataDetails.setPartitionCount("0"); // if this is a major compaction then set the segment as major compaction.