[CARBONDATA-1626]add data size and index size in table status file

This closes #1435


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/589f126d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/589f126d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/589f126d

Branch: refs/heads/fgdatamap
Commit: 589f126dea872f54c2096c9572436bf10589b1ca
Parents: f22e614
Author: akashrn5 <akashnilu...@gmail.com>
Authored: Wed Oct 25 15:27:37 2017 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Fri Nov 17 21:43:15 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  26 +++
 .../core/datastore/impl/FileFactory.java        |   2 +-
 .../core/statusmanager/LoadMetadataDetails.java |  18 ++
 .../apache/carbondata/core/util/CarbonUtil.java | 152 ++++++++++++++++
 .../core/util/path/CarbonTablePath.java         |   8 +
 .../spark/rdd/CarbonDataRDDFactory.scala        |  11 +-
 .../CarbonDescribeFormattedCommand.scala        |  10 ++
 .../spark/sql/GetDataSizeAndIndexSizeTest.scala | 172 +++++++++++++++++++
 .../processing/merger/CarbonDataMergerUtil.java |   7 +-
 9 files changed, 398 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 0a7dfdd..762ef6d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1380,6 +1380,32 @@ public final class CarbonCommonConstants {
 
   public static final String AGGREGATIONDATAMAPSCHEMA = 
"AggregateDataMapHandler";
 
+  /*
+   * The total size of carbon data
+   */
+  public static final String CARBON_TOTAL_DATA_SIZE = "datasize";
+
+  /**
+   * The total size of carbon index
+   */
+  public static final String CARBON_TOTAL_INDEX_SIZE = "indexsize";
+
+  /**
+   * ENABLE_CALCULATE_DATA_INDEX_SIZE
+   */
+  @CarbonProperty public static final String ENABLE_CALCULATE_SIZE = 
"carbon.enable.calculate.size";
+
+  /**
+   * DEFAULT_ENABLE_CALCULATE_DATA_INDEX_SIZE
+   */
+  @CarbonProperty public static final String DEFAULT_ENABLE_CALCULATE_SIZE = 
"true";
+
+  public static final String TABLE_DATA_SIZE = "Table Data Size";
+
+  public static final String TABLE_INDEX_SIZE = "Table Index Size";
+
+  public static final String LAST_UPDATE_TIME = "Last Update Time";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 57a48ec..240253d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -541,7 +541,7 @@ public final class FileFactory {
    * @param fileType
    * @return updated file path without url for local
    */
-  private static String getUpdatedFilePath(String filePath, FileType fileType) 
{
+  public static String getUpdatedFilePath(String filePath, FileType fileType) {
     switch (fileType) {
       case HDFS:
       case ALLUXIO:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index d838e2e..b282d53 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -41,6 +41,24 @@ public class LoadMetadataDetails implements Serializable {
   private String partitionCount;
 
   private String isDeleted = CarbonCommonConstants.KEYWORD_FALSE;
+  private String dataSize;
+  private String indexSize;
+
+  public String getDataSize() {
+    return dataSize;
+  }
+
+  public void setDataSize(String dataSize) {
+    this.dataSize = dataSize;
+  }
+
+  public String getIndexSize() {
+    return indexSize;
+  }
+
+  public void setIndexSize(String indexSize) {
+    this.indexSize = indexSize;
+  }
 
   // update delta end timestamp
   private String updateDeltaEndTimestamp = "";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 3c177dc..9d6acb6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -58,6 +58,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
+import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
@@ -67,6 +68,7 @@ import 
org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
@@ -77,6 +79,9 @@ import 
org.apache.carbondata.core.reader.ThriftReader.TBaseCreator;
 import org.apache.carbondata.core.scan.model.QueryDimension;
 import org.apache.carbondata.core.service.CarbonCommonFactory;
 import org.apache.carbondata.core.service.PathService;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -86,8 +91,11 @@ import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.DataChunk3;
 
 import com.google.gson.Gson;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TBase;
@@ -2140,5 +2148,149 @@ public final class CarbonUtil {
     return parentPath.toString() + CarbonCommonConstants.FILE_SEPARATOR + 
carbonTableIdentifier
         .getTableName();
   }
+
+  /*
+   * This method will add data size and index size into tablestatus for each 
segment
+   */
+  public static void addDataIndexSizeIntoMetaEntry(LoadMetadataDetails 
loadMetadataDetails,
+      String segmentId, CarbonTable carbonTable) throws IOException {
+    CarbonTablePath carbonTablePath =
+        
CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier()));
+    Map<String, Long> dataIndexSize =
+        CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, segmentId);
+    loadMetadataDetails
+        
.setDataSize(dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE).toString());
+    loadMetadataDetails
+        
.setIndexSize(dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE).toString());
+  }
+
+  /**
+   * This method will calculate the data size and index size for carbon table
+   */
+  public static Map<String, Long> calculateDataIndexSize(CarbonTable 
carbonTable)
+      throws IOException {
+    Map<String, Long> dataIndexSizeMap = new HashMap<String, Long>();
+    long dataSize = 0L;
+    long indexSize = 0L;
+    long lastUpdateTime = 0L;
+    boolean needUpdate = false;
+    AbsoluteTableIdentifier absoluteTableIdentifier = 
carbonTable.getAbsoluteTableIdentifier();
+    CarbonTablePath carbonTablePath = 
CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
+    String isCalculated = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.ENABLE_CALCULATE_SIZE,
+            CarbonCommonConstants.DEFAULT_ENABLE_CALCULATE_SIZE);
+    if (isCalculated.equalsIgnoreCase("true")) {
+      SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
+      ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+      try {
+        if (carbonLock.lockWithRetries()) {
+          LOGGER.info("Acquired lock for table for table status updation");
+          String metadataPath = carbonTable.getMetaDataFilepath();
+          LoadMetadataDetails[] loadMetadataDetails =
+              SegmentStatusManager.readLoadMetadata(metadataPath);
+
+          for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
+            SegmentStatus loadStatus = loadMetadataDetail.getSegmentStatus();
+            if (loadStatus == SegmentStatus.SUCCESS || loadStatus ==
+                      SegmentStatus.LOAD_PARTIAL_SUCCESS) {
+              String dsize = loadMetadataDetail.getDataSize();
+              String isize = loadMetadataDetail.getIndexSize();
+              // If it is old segment, need to calculate data size and index 
size again
+              if (null == dsize || null == isize) {
+                needUpdate = true;
+                LOGGER.info("It is an old segment, need calculate data size 
and index size again");
+                HashMap<String, Long> map = CarbonUtil
+                    .getDataSizeAndIndexSize(carbonTablePath, 
loadMetadataDetail.getLoadName());
+                dsize = 
String.valueOf(map.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE));
+                isize = 
String.valueOf(map.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE));
+                loadMetadataDetail.setDataSize(dsize);
+                loadMetadataDetail.setIndexSize(isize);
+              }
+              dataSize += Long.parseLong(dsize);
+              indexSize += Long.parseLong(isize);
+            }
+          }
+          // If it contains old segment, write new load details
+          if (needUpdate) {
+            
SegmentStatusManager.writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(),
+                loadMetadataDetails);
+          }
+          String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+          if (FileFactory.isFileExist(tableStatusPath, 
FileFactory.getFileType(tableStatusPath))) {
+            lastUpdateTime =
+                FileFactory.getCarbonFile(tableStatusPath, 
FileFactory.getFileType(tableStatusPath))
+                    .getLastModifiedTime();
+          }
+          dataIndexSizeMap
+              
.put(String.valueOf(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE), dataSize);
+          dataIndexSizeMap
+              
.put(String.valueOf(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE), indexSize);
+          dataIndexSizeMap
+              .put(String.valueOf(CarbonCommonConstants.LAST_UPDATE_TIME), 
lastUpdateTime);
+        } else {
+          LOGGER.error("Not able to acquire the lock for Table status updation 
for table");
+        }
+      } finally {
+        if (carbonLock.unlock()) {
+          LOGGER.info("Table unlocked successfully after table status 
updation");
+        } else {
+          LOGGER.error("Unable to unlock Table lock for table during table 
status updation");
+        }
+      }
+    }
+    return dataIndexSizeMap;
+  }
+
+  // Get the total size of carbon data and the total size of carbon index
+  public static HashMap<String, Long> getDataSizeAndIndexSize(CarbonTablePath 
carbonTablePath,
+      String segmentId) throws IOException {
+    long carbonDataSize = 0L;
+    long carbonIndexSize = 0L;
+    HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>();
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", 
segmentId);
+    FileFactory.FileType fileType = FileFactory.getFileType(segmentPath);
+    switch (fileType) {
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+      case S3:
+        Path path = new Path(segmentPath);
+        FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
+        FileStatus[] fileStatuses = fs.listStatus(path);
+        if (null != fileStatuses) {
+          for (FileStatus dataAndIndexStatus : fileStatuses) {
+            String pathName = dataAndIndexStatus.getPath().getName();
+            if (pathName.endsWith(CarbonTablePath.getCarbonIndexExtension()) 
|| pathName
+                .endsWith(CarbonTablePath.getCarbonMergeIndexExtension())) {
+              carbonIndexSize += dataAndIndexStatus.getLen();
+            } else if 
(pathName.endsWith(CarbonTablePath.getCarbonDataExtension())) {
+              carbonDataSize += dataAndIndexStatus.getLen();
+            }
+          }
+        }
+        break;
+      case LOCAL:
+      default:
+        segmentPath = FileFactory.getUpdatedFilePath(segmentPath, fileType);
+        File file = new File(segmentPath);
+        File[] segmentFiles = file.listFiles();
+        if (null != segmentFiles) {
+          for (File dataAndIndexFile : segmentFiles) {
+            if (dataAndIndexFile.getCanonicalPath()
+                .endsWith(CarbonTablePath.getCarbonIndexExtension()) || 
dataAndIndexFile
+                
.getCanonicalPath().endsWith(CarbonTablePath.getCarbonMergeIndexExtension())) {
+              carbonIndexSize += FileUtils.sizeOf(dataAndIndexFile);
+            } else if (dataAndIndexFile.getCanonicalPath()
+                .endsWith(CarbonTablePath.getCarbonDataExtension())) {
+              carbonDataSize += FileUtils.sizeOf(dataAndIndexFile);
+            }
+          }
+        }
+    }
+    dataAndIndexSize.put(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE, 
carbonDataSize);
+    dataAndIndexSize.put(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE, 
carbonIndexSize);
+    return dataAndIndexSize;
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index d363ac3..376a71f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -681,6 +681,14 @@ public class CarbonTablePath extends Path {
   }
 
   /**
+   *
+   * @return carbon index merge file extension
+   */
+  public static String getCarbonMergeIndexExtension() {
+    return MERGE_INDEX_FILE_EXT;
+  }
+
+  /**
    * This method will remove strings in path and return short block id
    *
    * @param blockId

http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index e32c407..7dad243 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -55,11 +55,8 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{LoadTablePostExecutionEvent, 
OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.FailureCauses
-import org.apache.carbondata.processing.loading.csvinput.BlockDetails
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
-import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
-import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, 
CSVInputFormat, StringArrayWritable}
+import 
org.apache.carbondata.processing.loading.exception.{CarbonDataLoadingException, 
NoRetryException}
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, 
CarbonDataMergerUtil, CompactionType}
@@ -292,6 +289,7 @@ object CarbonDataRDDFactory {
     var executorMessage: String = ""
     val isSortTable = carbonTable.getNumberOfSortColumns > 0
     val sortScope = 
CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)
+
     try {
       if (updateModel.isDefined) {
         res = loadDataFrameForUpdate(
@@ -748,6 +746,7 @@ object CarbonDataRDDFactory {
       loadStatus: SegmentStatus,
       overwriteTable: Boolean
   ): Boolean = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val metadataDetails = if (status != null && status(0) != null) {
       status(0)._2._1
     } else {
@@ -758,6 +757,8 @@ object CarbonDataRDDFactory {
       loadStatus,
       carbonLoadModel.getFactTimeStamp,
       true)
+    CarbonUtil
+      .addDataIndexSizeIntoMetaEntry(metadataDetails, 
carbonLoadModel.getSegmentId, carbonTable)
     val done = CarbonLoaderUtil.recordLoadMetadata(metadataDetails, 
carbonLoadModel, false,
       overwriteTable)
     if (!done) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
index b233c99..b61078b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala
@@ -30,6 +30,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.CarbonUtil
 
 private[sql] case class CarbonDescribeFormattedCommand(
     child: SparkPlan,
@@ -112,6 +113,15 @@ private[sql] case class CarbonDescribeFormattedCommand(
       .getOrDefault(CarbonCommonConstants.TABLE_COMMENT, "")
     results ++= Seq(("Comment: ", tableComment, ""))
     results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " 
MB", ""))
+    val dataIndexSize = CarbonUtil.calculateDataIndexSize(carbonTable)
+    if (!dataIndexSize.isEmpty) {
+      results ++= Seq((CarbonCommonConstants.TABLE_DATA_SIZE + ":",
+        
dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE).toString, ""))
+      results ++= Seq((CarbonCommonConstants.TABLE_INDEX_SIZE + ":",
+        
dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE).toString, ""))
+      results ++= Seq((CarbonCommonConstants.LAST_UPDATE_TIME + ":",
+        dataIndexSize.get(CarbonCommonConstants.LAST_UPDATE_TIME).toString, 
""))
+    }
     results ++= Seq(("SORT_SCOPE", carbonTable.getTableInfo.getFactTable
       .getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants
       .LOAD_SORT_SCOPE_DEFAULT), 
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/integration/spark2/src/test/scala/org/apache/spark/sql/GetDataSizeAndIndexSizeTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/sql/GetDataSizeAndIndexSizeTest.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/sql/GetDataSizeAndIndexSizeTest.scala
new file mode 100644
index 0000000..03ec3a1
--- /dev/null
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/sql/GetDataSizeAndIndexSizeTest.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.scalatest.BeforeAndAfterAll
+
+class GetDataSizeAndIndexSizeTest extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll(): Unit = {
+    sql("DROP TABLE IF EXISTS tableSize1")
+    sql("DROP TABLE IF EXISTS tableSize2")
+    sql("DROP TABLE IF EXISTS tableSize3")
+    sql("DROP TABLE IF EXISTS tableSize4")
+    sql("DROP TABLE IF EXISTS tableSize5")
+    sql("DROP TABLE IF EXISTS tableSize6")
+    sql("DROP TABLE IF EXISTS tableSize7")
+    sql("DROP TABLE IF EXISTS tableSize8")
+    sql("DROP TABLE IF EXISTS tableSize9")
+    sql("DROP TABLE IF EXISTS tableSize10")
+    sql("DROP TABLE IF EXISTS tableSize11")
+  }
+
+  override def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS tableSize1")
+    sql("DROP TABLE IF EXISTS tableSize2")
+    sql("DROP TABLE IF EXISTS tableSize3")
+    sql("DROP TABLE IF EXISTS tableSize4")
+    sql("DROP TABLE IF EXISTS tableSize5")
+    sql("DROP TABLE IF EXISTS tableSize6")
+    sql("DROP TABLE IF EXISTS tableSize7")
+    sql("DROP TABLE IF EXISTS tableSize8")
+    sql("DROP TABLE IF EXISTS tableSize9")
+    sql("DROP TABLE IF EXISTS tableSize10")
+    sql("DROP TABLE IF EXISTS tableSize11")
+  }
+
+  test("get data size and index size after load data") {
+    sql("CREATE TABLE tableSize1 (empno int, workgroupcategory string, deptno 
int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
tableSize1 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""")
+    checkExistence(sql("DESCRIBE FORMATTED tableSize1"), true, 
CarbonCommonConstants.TABLE_DATA_SIZE)
+    checkExistence(sql("DESCRIBE FORMATTED tableSize1"), true, 
CarbonCommonConstants.TABLE_INDEX_SIZE)
+    val res1 = sql("DESCRIBE FORMATTED tableSize1").collect()
+      .filter(row => 
row.getString(0).contains(CarbonCommonConstants.TABLE_DATA_SIZE) ||
+      row.getString(0).contains(CarbonCommonConstants.TABLE_INDEX_SIZE))
+    assert(res1.length == 2)
+    res1.foreach(row => assert(row.getString(1).trim.toLong > 0))
+  }
+
+  test("get data size and index size after major compaction") {
+    sql("CREATE TABLE tableSize2 (empno int, workgroupcategory string, deptno 
int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
tableSize2 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
tableSize2 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""")
+    sql("ALTER TABLE tableSize2 compact 'major'")
+    checkExistence(sql("DESCRIBE FORMATTED tableSize2"), true, 
CarbonCommonConstants.TABLE_DATA_SIZE)
+    checkExistence(sql("DESCRIBE FORMATTED tableSize2"), true, 
CarbonCommonConstants.TABLE_INDEX_SIZE)
+    val res2 = sql("DESCRIBE FORMATTED tableSize2").collect()
+      .filter(row => 
row.getString(0).contains(CarbonCommonConstants.TABLE_DATA_SIZE) ||
+        row.getString(0).contains(CarbonCommonConstants.TABLE_INDEX_SIZE))
+    assert(res2.length == 2)
+    res2.foreach(row => assert(row.getString(1).trim.toLong > 0))
+  }
+
+  test("get data size and index size after minor compaction") {
+    sql("CREATE TABLE tableSize3 (empno int, workgroupcategory string, deptno 
int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
tableSize3 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""")
+    sql("ALTER TABLE tableSize3 compact 'minor'")
+    checkExistence(sql("DESCRIBE FORMATTED tableSize3"), true, 
CarbonCommonConstants.TABLE_DATA_SIZE)
+    checkExistence(sql("DESCRIBE FORMATTED tableSize3"), true, 
CarbonCommonConstants.TABLE_INDEX_SIZE)
+    val res3 = sql("DESCRIBE FORMATTED tableSize3").collect()
+      .filter(row => 
row.getString(0).contains(CarbonCommonConstants.TABLE_DATA_SIZE) ||
+        row.getString(0).contains(CarbonCommonConstants.TABLE_INDEX_SIZE))
+    assert(res3.length == 2)
+    res3.foreach(row => assert(row.getString(1).trim.toLong > 0))
+  }
+
+  test("get data size and index size after insert into") {
+    sql("CREATE TABLE tableSize4 (empno int, workgroupcategory string, deptno 
int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
tableSize4 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""")
+    sql("CREATE TABLE tableSize5 (empno int, workgroupcategory string, deptno 
int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'")
+    sql("INSERT INTO TABLE tableSize5 SELECT * FROM tableSize4")
+    checkExistence(sql("DESCRIBE FORMATTED tableSize5"), true, 
CarbonCommonConstants.TABLE_DATA_SIZE)
+    checkExistence(sql("DESCRIBE FORMATTED tableSize5"), true, 
CarbonCommonConstants.TABLE_INDEX_SIZE)
+    val res4 = sql("DESCRIBE FORMATTED tableSize5").collect()
+      .filter(row => 
row.getString(0).contains(CarbonCommonConstants.TABLE_DATA_SIZE) ||
+        row.getString(0).contains(CarbonCommonConstants.TABLE_INDEX_SIZE))
+    assert(res4.length == 2)
+    res4.foreach(row => assert(row.getString(1).trim.toLong > 0))
+  }
+
+  test("get data size and index size after insert overwrite") {
+    sql("CREATE TABLE tableSize6 (empno int, workgroupcategory string, deptno 
int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
tableSize6 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""")
+    sql("CREATE TABLE tableSize7 (empno int, workgroupcategory string, deptno 
int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'")
+    sql("INSERT OVERWRITE TABLE tableSize7 SELECT * FROM tableSize6")
+    checkExistence(sql("DESCRIBE FORMATTED tableSize7"), true, 
CarbonCommonConstants.TABLE_DATA_SIZE)
+    checkExistence(sql("DESCRIBE FORMATTED tableSize7"), true, 
CarbonCommonConstants.TABLE_INDEX_SIZE)
+    val res5 = sql("DESCRIBE FORMATTED tableSize7").collect()
+      .filter(row => 
row.getString(0).contains(CarbonCommonConstants.TABLE_DATA_SIZE) ||
+        row.getString(0).contains(CarbonCommonConstants.TABLE_INDEX_SIZE))
+    assert(res5.length == 2)
+    res5.foreach(row => assert(row.getString(1).trim.toLong > 0))
+  }
+
+  test("get data size and index size for empty table") {
+    sql("CREATE TABLE tableSize8 (empno int, workgroupcategory string, deptno 
int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'")
+    val res6 = sql("DESCRIBE FORMATTED tableSize8").collect()
+      .filter(row => 
row.getString(0).contains(CarbonCommonConstants.TABLE_DATA_SIZE) ||
+        row.getString(0).contains(CarbonCommonConstants.TABLE_INDEX_SIZE))
+    assert(res6.length == 2)
+    res6.foreach(row => assert(row.getString(1).trim.toLong == 0))
+  }
+
+  test("get last update time for empty table") {
+    sql("CREATE TABLE tableSize9 (empno int, workgroupcategory string, deptno 
int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'")
+    val res7 = sql("DESCRIBE FORMATTED tableSize9").collect()
+      .filter(row => 
row.getString(0).contains(CarbonCommonConstants.LAST_UPDATE_TIME))
+    assert(res7.length == 1)
+    res7.foreach(row => assert(row.getString(1).trim.toLong == 0))
+  }
+
+  test("get last update time for unempty table") {
+    sql("CREATE TABLE tableSize10 (empno int, workgroupcategory string, deptno 
int, projectcode int, attendance int) STORED BY 'org.apache.carbondata.format'")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
tableSize10 OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='')""")
+
+    val res8 = sql("DESCRIBE FORMATTED tableSize10").collect()
+      .filter(row => 
row.getString(0).contains(CarbonCommonConstants.LAST_UPDATE_TIME))
+    assert(res8.length == 1)
+    res8.foreach(row => assert(row.getString(1).trim.toLong > 0))
+  }
+
+  test("index and datasize for update scenario") {
+    sql(
+      "CREATE TABLE tableSize11 (empno int, workgroupcategory string, deptno 
int, projectcode " +
+      "int, attendance int) STORED BY 'org.apache.carbondata.format'")
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
tableSize11 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '\"', 
'FILEHEADER'='')""".stripMargin)
+    val res9 = sql("DESCRIBE FORMATTED tableSize11").collect()
+      .filter(row => 
row.getString(0).contains(CarbonCommonConstants.TABLE_DATA_SIZE) ||
+                     
row.getString(0).contains(CarbonCommonConstants.TABLE_INDEX_SIZE))
+    assert(res9.length == 2)
+    res9.foreach(row => assert(row.getString(1).trim.toLong > 0))
+    sql("update tableSize11 set (empno) = (234)").show()
+    val res10 = sql("DESCRIBE FORMATTED tableSize11").collect()
+      .filter(row => 
row.getString(0).contains(CarbonCommonConstants.TABLE_DATA_SIZE) ||
+                     
row.getString(0).contains(CarbonCommonConstants.TABLE_INDEX_SIZE))
+    assert(res10.length == 2)
+    res10.foreach(row => assert(row.getString(1).trim.toLong > 0))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/589f126d/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 8f6d19c..15ee4fb 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -50,6 +50,7 @@ import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
@@ -281,8 +282,7 @@ public final class CarbonDataMergerUtil {
    */
   public static boolean 
updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
       String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel 
carbonLoadModel,
-      long mergeLoadStartTime, CompactionType compactionType) {
-
+      long mergeLoadStartTime, CompactionType compactionType) throws 
IOException {
     boolean tableStatusUpdationStatus = false;
     AbsoluteTableIdentifier absoluteTableIdentifier =
         
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
@@ -325,7 +325,10 @@ public final class CarbonDataMergerUtil {
         loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS);
         long loadEnddate = CarbonUpdateUtil.readCurrentTime();
         loadMetadataDetails.setLoadEndTime(loadEnddate);
+        CarbonTable carbonTable = 
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
         loadMetadataDetails.setLoadName(mergedLoadNumber);
+        CarbonUtil
+            .addDataIndexSizeIntoMetaEntry(loadMetadataDetails, 
mergedLoadNumber, carbonTable);
         loadMetadataDetails.setLoadStartTime(mergeLoadStartTime);
         loadMetadataDetails.setPartitionCount("0");
         // if this is a major compaction then set the segment as major 
compaction.

Reply via email to