This is an automated email from the ASF dual-hosted git repository.

xubo245 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new be76a62  [CARBONDATA-3578] Make table status file smaller
be76a62 is described below

commit be76a624eec6516a09691de1e553c7a8a2976b3b
Author: Jacky Li <[email protected]>
AuthorDate: Wed Nov 13 19:05:00 2019 +0800

    [CARBONDATA-3578] Make table status file smaller
    
    Currently, each segment entry in the table status file occupies 347 Bytes, 
if one has 10000 segments, the file becomes 3.47MB.
    Since carbondata relies on this file heavily, it is better to reduce its 
size to improve IO, especially in data lake scenario.
    
    Each entry in table status file is one LoadMetadataDetails object.
    In this PR, following changes are made in LoadMetadataDetails to reduce its 
size:
    
    Do not write fields that has default value, like "visibility", 
"fileFormat", etc
    Use shorter key, for example, "loadStatus" is changed to "ls"
    In this PR, table status file size is reduced to 1/3.
    Before change: 347Bytes
    
         {
             "timestamp": "1573635015982",
             "loadStatus": "Success",
             "loadName": "0",
             "partitionCount": "0",
             "isDeleted": "FALSE",
             "dataSize": "2977",
             "indexSize": "1469",
             "updateDeltaEndTimestamp": "",
             "updateDeltaStartTimestamp": "",
             "updateStatusFileName": "",
             "loadStartTime": "1573635014638",
             "visibility": "true",
             "fileFormat": "columnar_v3",
             "segmentFile": "0_1573635014638.segment"
         }
    After change: 118Bytes ( reduced to 1/3 size)
    
         {
             "ts": "1573635284677",
             "ls": "S",         -- stands for Success
             "ln": "0",
             "ds": "2977",
             "is": "1469",
             "lt": "1573635284045",
             "sf": "0_1573635284045.segment"
         }
    About the backward compatibility, this PR still can read the old table 
status file, by using GSON's @SerializedName(alternate), so it does not break 
backward compatibility.
    
    This closes #3449
---
 .../carbondata/core/mutate/CarbonUpdateUtil.java   |   1 -
 .../core/statusmanager/LoadMetadataDetails.java    | 139 +++++++++++++--------
 .../core/statusmanager/SegmentStatus.java          |   4 +-
 .../core/statusmanager/SegmentStatusManager.java   |  10 ++
 .../spark/rdd/NewCarbonDataLoadRDD.scala           |   3 -
 .../spark/rdd/CarbonDataRDDFactory.scala           |   1 -
 .../processing/merger/CarbonDataMergerUtil.java    |   2 -
 .../streaming/segment/StreamSegment.java           |   4 +-
 8 files changed, 101 insertions(+), 63 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java 
b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 5bccdd6..8155825 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -248,7 +248,6 @@ public class CarbonUpdateUtil {
               // if this call is coming from the delete delta flow then the 
time stamp
               // String will come empty then no need to write into table 
status file.
               if (isTimestampUpdationRequired) {
-                loadMetadata.setIsDeleted(CarbonCommonConstants.KEYWORD_TRUE);
                 // if in case of update flow.
                 if (loadMetadata.getUpdateDeltaStartTimestamp().isEmpty()) {
                   // this means for first time it is getting updated .
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index f160b49..c04f219 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -25,6 +25,8 @@ import java.util.Date;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 
+import com.google.gson.annotations.SerializedName;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 
 /*
@@ -53,99 +55,106 @@ import org.apache.log4j.Logger;
 public class LoadMetadataDetails implements Serializable {
 
   private static final long serialVersionUID = 1106104914918491724L;
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(LoadMetadataDetails.class.getName());
+
+  // don't remove static as the write will fail.
+  private static final SimpleDateFormat parser =
+      new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
+
+  @SerializedName(value = "ts", alternate = "timestamp")
   private String timestamp;
 
   // For backward compatibility, this member is required to read from JSON in 
the table_status file
+  @SerializedName(value = "ls", alternate = "loadStatus")
   private SegmentStatus loadStatus;
 
   // name of the segment
+  @SerializedName(value = "ln", alternate = "loadName")
   private String loadName;
 
-  // partition count of this segment
-  private String partitionCount;
-
-  private String isDeleted = CarbonCommonConstants.KEYWORD_FALSE;
+  @SerializedName(value = "ds", alternate = "dataSize")
   private String dataSize;
-  private String indexSize;
-
-  public String getDataSize() {
-    return dataSize;
-  }
-
-  public void setDataSize(String dataSize) {
-    this.dataSize = dataSize;
-  }
 
-  public String getIndexSize() {
-    return indexSize;
-  }
-
-  public void setIndexSize(String indexSize) {
-    this.indexSize = indexSize;
-  }
+  @SerializedName(value = "is", alternate = "indexSize")
+  private String indexSize;
 
   // update delta end timestamp
-  private String updateDeltaEndTimestamp = "";
+  @SerializedName(value = "ue", alternate = "updateDeltaEndTimestamp")
+  private String updateDeltaEndTimestamp;
 
   // update delta start timestamp
-  private String updateDeltaStartTimestamp = "";
+  @SerializedName(value = "us", alternate = "updateDeltaStartTimestamp")
+  private String updateDeltaStartTimestamp;
 
   // this will represent the update status file name at that point of time.
-  private String updateStatusFileName = "";
-
-  /**
-   * LOGGER
-   */
-  private static final Logger LOGGER =
-      LogServiceFactory.getLogService(LoadMetadataDetails.class.getName());
+  @SerializedName(value = "uf", alternate = "updateStatusFileName")
+  private String updateStatusFileName;
 
-  // don't remove static as the write will fail.
-  private static final SimpleDateFormat parser =
-      new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
   /**
    * Segment modification or deletion time stamp
    */
+  @SerializedName(value = "mt", alternate = "modificationOrdeletionTimesStamp")
   private String modificationOrdeletionTimesStamp;
+
+  @SerializedName(value = "lt", alternate = "loadStartTime")
   private String loadStartTime;
 
+  @SerializedName(value = "mn", alternate = "mergedLoadName")
   private String mergedLoadName;
+
   /**
    * visibility is used to determine whether to the load is visible or not.
+   * by default it is true
    */
-  private String visibility = "true";
+  @SerializedName(value = "v", alternate = "visibility")
+  private String visibility;
 
   /**
    * To know if the segment is a major compacted segment or not.
    */
+  @SerializedName(value = "mc", alternate = "majorCompacted")
   private String majorCompacted;
 
   /**
-   * the file format of this segment
+   * the file format of this segment, by default it is FileFormat.COLUMNAR_V3
    */
-  private String fileFormat = FileFormat.COLUMNAR_V3.toString();
+  @SerializedName(value = "ff", alternate = "fileFormat")
+  private String fileFormat;
 
   /**
    * Segment path if the segment is added externally.
    */
+  @SerializedName(value = "p", alternate = "path")
   private String path;
 
   /**
    * Segment file name where it has the information of partition information.
    */
+  @SerializedName(value = "sf", alternate = "segmentFile")
   private String segmentFile;
 
-  public String getPartitionCount() {
-    return partitionCount;
-  }
-
   /**
    * extraInfo will contain segment mapping Information for datamap table
    */
+  @SerializedName(value = "ei", alternate = "extraInfo")
   private String extraInfo;
 
-  @Deprecated
-  public void setPartitionCount(String partitionCount) {
-    this.partitionCount = partitionCount;
+  public String getDataSize() {
+    return dataSize;
+  }
+
+  public void setDataSize(String dataSize) {
+    this.dataSize = dataSize;
+  }
+
+  public String getIndexSize() {
+    return indexSize;
+  }
+
+  public void setIndexSize(String indexSize) {
+    this.indexSize = indexSize;
   }
 
   public long getLoadEndTime() {
@@ -336,6 +345,9 @@ public class LoadMetadataDetails implements Serializable {
    * @return the visibility
    */
   public String getVisibility() {
+    if (visibility == null) {
+      return "true";
+    }
     return visibility;
   }
 
@@ -364,20 +376,14 @@ public class LoadMetadataDetails implements Serializable {
   }
 
   /**
-   * To set isDeleted property.
-   *
-   * @param isDeleted
-   */
-  public void setIsDeleted(String isDeleted) {
-    this.isDeleted = isDeleted;
-  }
-
-  /**
    * To get the update delta end timestamp
    *
    * @return updateDeltaEndTimestamp
    */
   public String getUpdateDeltaEndTimestamp() {
+    if (updateDeltaEndTimestamp == null) {
+      return "";
+    }
     return updateDeltaEndTimestamp;
   }
 
@@ -396,6 +402,9 @@ public class LoadMetadataDetails implements Serializable {
    * @return updateDeltaStartTimestamp
    */
   public String getUpdateDeltaStartTimestamp() {
+    if (updateDeltaStartTimestamp == null) {
+      return "";
+    }
     return updateDeltaStartTimestamp;
   }
 
@@ -414,6 +423,9 @@ public class LoadMetadataDetails implements Serializable {
    * @return updateStatusFileName
    */
   public String getUpdateStatusFileName() {
+    if (updateStatusFileName == null) {
+      return "";
+    }
     return updateStatusFileName;
   }
 
@@ -427,6 +439,9 @@ public class LoadMetadataDetails implements Serializable {
   }
 
   public FileFormat getFileFormat() {
+    if (fileFormat == null) {
+      return FileFormat.COLUMNAR_V3;
+    }
     return new FileFormat(fileFormat);
   }
 
@@ -468,4 +483,26 @@ public class LoadMetadataDetails implements Serializable {
     return getFileFormat().equals(FileFormat.COLUMNAR_V3)
         || getFileFormat().equals(FileFormat.ROW_V1);
   }
+
+  /**
+   * Before writing table status file, call this to make the metadata smaller.
+   * It checks if fields are default value, then make it null so GSON does not 
write it
+   */
+  void removeUnnecessaryField() {
+    if (StringUtils.isEmpty(updateDeltaEndTimestamp)) {
+      updateDeltaEndTimestamp = null;
+    }
+    if (StringUtils.isEmpty(updateDeltaStartTimestamp)) {
+      updateDeltaStartTimestamp = null;
+    }
+    if (StringUtils.isEmpty(updateStatusFileName)) {
+      updateStatusFileName = null;
+    }
+    if (StringUtils.isEmpty(visibility) || visibility.equals("true")) {
+      visibility = null;
+    }
+    if (StringUtils.isEmpty(fileFormat) || 
fileFormat.equals(FileFormat.COLUMNAR_V3.toString())) {
+      fileFormat = null;
+    }
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatus.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatus.java
index 3b62ec9..eb6a368 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatus.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatus.java
@@ -28,7 +28,7 @@ public enum SegmentStatus {
   /**
    * Data load success, it is visible for read
    */
-  @SerializedName("Success")
+  @SerializedName(value = "S", alternate = "Success")
   SUCCESS("Success"),
 
   /**
@@ -58,7 +58,7 @@ public enum SegmentStatus {
   /**
    * Segment is compacted
    */
-  @SerializedName("Compacted")
+  @SerializedName(value = "C", alternate = "Compacted")
   COMPACTED("Compacted"),
 
   /**
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 5fbcb28..0804170 100755
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -546,6 +546,11 @@ public class SegmentStatusManager {
       brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
               Charset.forName(DEFAULT_CHARSET)));
 
+      // make the table status file smaller by removing fields that are 
default value
+      for (LoadMetadataDetails loadMetadataDetails : 
listOfLoadFolderDetailsArray) {
+        loadMetadataDetails.removeUnnecessaryField();
+      }
+
       String metadataInstance = 
gsonObjectToWrite.toJson(listOfLoadFolderDetailsArray);
       brWriter.write(metadataInstance);
     } catch (IOException ioe) {
@@ -767,9 +772,11 @@ public class SegmentStatusManager {
       this.listOfStreamSegments = listOfStreamSegments;
       this.listOfInProgressSegments = listOfInProgressSegments;
     }
+
     public List<Segment> getInvalidSegments() {
       return listOfInvalidSegments;
     }
+
     public List<Segment> getValidSegments() {
       return listOfValidSegments;
     }
@@ -917,6 +924,9 @@ public class SegmentStatusManager {
       brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
           Charset.forName(DEFAULT_CHARSET)));
 
+      // make the table status file smaller by removing fields that are 
default value
+      
listOfLoadFolderDetails.forEach(LoadMetadataDetails::removeUnnecessaryField);
+
       String metadataInstance = 
gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
       brWriter.write(metadataInstance);
     } catch (IOException ie) {
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index ce60a55..4d42446 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -128,7 +128,6 @@ class NewCarbonDataLoadRDD[K, V](
       val uniqueLoadStatusId =
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + 
theSplit.index
       try {
-        
loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PARTITION_ID)
         loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
 
         val preFetch = 
CarbonProperties.getInstance().getProperty(CarbonCommonConstants
@@ -259,7 +258,6 @@ class NewDataFrameLoaderRDD[K, V](
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + 
theSplit.index
       try {
 
-        
loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PARTITION_ID)
         loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
         carbonLoadModel.setPreFetch(false)
@@ -470,7 +468,6 @@ class PartitionTableDataLoaderRDD[K, V](
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + 
theSplit.index
       try {
 
-        
loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PARTITION_ID)
         loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
         
carbonLoadModel.setTaskNo(String.valueOf(partitionInfo.getPartitionId(theSplit.index)))
         carbonLoadModel.setPreFetch(false)
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 488468a..6767a5c 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -781,7 +781,6 @@ object CarbonDataRDDFactory {
                              CarbonCommonConstants.UNDERSCORE +
                              (index + "_0")
 
-        
loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PARTITION_ID)
         loadMetadataDetails.setLoadName(segId)
         loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE)
         carbonLoadModel.setSegmentId(segId)
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index f0abf44..c3a30f7 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -340,7 +340,6 @@ public final class CarbonDataMergerUtil {
 
         // create entry for merged one.
         LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
-        
loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PARTITION_ID);
         loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS);
         long loadEnddate = CarbonUpdateUtil.readCurrentTime();
         loadMetadataDetails.setLoadEndTime(loadEnddate);
@@ -350,7 +349,6 @@ public final class CarbonDataMergerUtil {
         CarbonLoaderUtil
             .addDataIndexSizeIntoMetaEntry(loadMetadataDetails, 
mergedLoadNumber, carbonTable);
         
loadMetadataDetails.setLoadStartTime(carbonLoadModel.getFactTimeStamp());
-        loadMetadataDetails.setPartitionCount("0");
         // if this is a major compaction then set the segment as major 
compaction.
         if (CompactionType.MAJOR == compactionType) {
           loadMetadataDetails.setMajorCompacted("true");
diff --git 
a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
 
b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 6211d0c..b0332fd 100644
--- 
a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ 
b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -96,7 +96,6 @@ public class StreamSegment {
         if (null == streamSegment) {
           int segmentId = SegmentStatusManager.createNewSegmentId(details);
           LoadMetadataDetails newDetail = new LoadMetadataDetails();
-          newDetail.setPartitionCount("0");
           newDetail.setLoadName("" + segmentId);
           newDetail.setFileFormat(FileFormat.ROW_V1);
           newDetail.setLoadStartTime(System.currentTimeMillis());
@@ -159,8 +158,7 @@ public class StreamSegment {
 
         int newSegmentId = SegmentStatusManager.createNewSegmentId(details);
         LoadMetadataDetails newDetail = new LoadMetadataDetails();
-        newDetail.setPartitionCount("0");
-        newDetail.setLoadName("" + newSegmentId);
+        newDetail.setLoadName(String.valueOf(newSegmentId));
         newDetail.setFileFormat(FileFormat.ROW_V1);
         newDetail.setLoadStartTime(System.currentTimeMillis());
         newDetail.setSegmentStatus(SegmentStatus.STREAMING);

Reply via email to