Repository: carbondata
Updated Branches:
  refs/heads/master bd6abbbff -> 7e93d7b87


[CARBONDATA-2803]fix wrong datasize calculation and Refactoring for better 
readability and handle local dictionary for older tables

Changes in this PR:
1.data size was calculation wrongly, indexmap contains duplicate paths as it 
stores all blocklets, so remove duplicate and maintain uniq block paths for 
proper datasize calculation
2.Refactored code for better readability in carbonTableInputFormat
3. If the tableperoperties contain local dictionary enable property as null, it 
is old table, and put the flase value in the properties map

This closes #2583


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7e93d7b8
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7e93d7b8
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7e93d7b8

Branch: refs/heads/master
Commit: 7e93d7b8707c36bf3f8d1f153b67a8cb997fa0f4
Parents: bd6abbb
Author: akashrn5 <akashnilu...@gmail.com>
Authored: Mon Jul 30 19:41:34 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Aug 2 17:00:05 2018 +0530

----------------------------------------------------------------------
 .../indexstore/blockletindex/BlockDataMap.java  |  2 +-
 .../core/metadata/SegmentFileStore.java         | 23 ++------
 .../core/metadata/schema/table/CarbonTable.java | 10 ++--
 .../hadoop/api/CarbonTableInputFormat.java      | 61 ++++++++++++--------
 .../FlatFolderTableLoadingTestCase.scala        | 31 ++++++++++
 5 files changed, 81 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e93d7b8/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index f4bb58e..0875e75 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -588,7 +588,7 @@ public class BlockDataMap extends CoarseGrainDataMap
 
   private boolean useMinMaxForExecutorPruning(FilterResolverIntf 
filterResolverIntf) {
     boolean useMinMaxForPruning = false;
-    if (this instanceof BlockletDataMap) {
+    if (!isLegacyStore && this instanceof BlockletDataMap) {
       useMinMaxForPruning = BlockletDataMapUtil
           .useMinMaxForBlockletPruning(filterResolverIntf, 
getMinMaxCacheColumns());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e93d7b8/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 111e444..1acf0ea 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -16,22 +16,9 @@
  */
 package org.apache.carbondata.core.metadata;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Serializable;
+import java.io.*;
 import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -511,11 +498,13 @@ public class SegmentFileStore {
     for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
       List<DataFileFooter> indexInfo =
           fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
-      List<String> blocks = new ArrayList<>();
+      // carbonindex file stores blocklets so block filename will be 
duplicated, use set to remove
+      // duplicates
+      Set<String> blocks = new LinkedHashSet<>();
       for (DataFileFooter footer : indexInfo) {
         blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
       }
-      indexFilesMap.put(entry.getKey(), blocks);
+      indexFilesMap.put(entry.getKey(), new ArrayList<>(blocks));
       boolean added = false;
       for (Map.Entry<String, List<String>> mergeFile : indexFileStore
           .getCarbonMergeFileToIndexFilesMap().entrySet()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e93d7b8/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 850a791..14052f8 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1160,10 +1160,11 @@ public class CarbonTable implements Serializable {
    * @param tableInfo
    */
   private static void setLocalDictInfo(CarbonTable table, TableInfo tableInfo) 
{
-    String isLocalDictionaryEnabled = 
tableInfo.getFactTable().getTableProperties()
-        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE);
-    String localDictionaryThreshold = 
tableInfo.getFactTable().getTableProperties()
-        .get(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD);
+    Map<String, String> tableProperties = 
tableInfo.getFactTable().getTableProperties();
+    String isLocalDictionaryEnabled =
+        tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE);
+    String localDictionaryThreshold =
+        tableProperties.get(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD);
     if (null != isLocalDictionaryEnabled) {
       
table.setLocalDictionaryEnabled(Boolean.parseBoolean(isLocalDictionaryEnabled));
       if (null != localDictionaryThreshold) {
@@ -1176,6 +1177,7 @@ public class CarbonTable implements Serializable {
       // in case of old tables, local dictionary enable property will not be 
present in
       // tableProperties, so disable the local dictionary generation
       table.setLocalDictionaryEnabled(Boolean.parseBoolean("false"));
+      tableProperties.put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, 
"false");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e93d7b8/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index f53a1d7..be72983 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -198,30 +198,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
         getFilteredSegment(job, new ArrayList<>(validAndInProgressSegments), 
false,
             readCommittedScope);
     // Clean the updated segments from memory if the update happens on segments
-    List<Segment> toBeCleanedSegments = new ArrayList<>();
-    for (Segment filteredSegment : filteredSegmentToAccess) {
-      boolean refreshNeeded =
-          
DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
-              .isRefreshNeeded(filteredSegment,
-                  
updateStatusManager.getInvalidTimestampRange(filteredSegment.getSegmentNo()));
-      if (refreshNeeded) {
-        toBeCleanedSegments.add(filteredSegment);
-      }
-    }
-    // Clean segments if refresh is needed
-    for (Segment segment : filteredSegmentToAccess) {
-      if 
(DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
-          .isRefreshNeeded(segment.getSegmentNo())) {
-        toBeCleanedSegments.add(segment);
-      }
-    }
-
-
-    if (toBeCleanedSegments.size() > 0) {
-      DataMapStoreManager.getInstance()
-          .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()),
-              toBeCleanedSegments);
-    }
+    refreshSegmentCacheIfRequired(job, carbonTable, updateStatusManager, 
filteredSegmentToAccess);
 
     // process and resolve the expression
     Expression filter = getFilterPredicates(job.getConfiguration());
@@ -266,6 +243,42 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
   }
 
   /**
+   * Method to check and refresh segment cache
+   *
+   * @param job
+   * @param carbonTable
+   * @param updateStatusManager
+   * @param filteredSegmentToAccess
+   * @throws IOException
+   */
+  public void refreshSegmentCacheIfRequired(JobContext job, CarbonTable 
carbonTable,
+      SegmentUpdateStatusManager updateStatusManager, List<Segment> 
filteredSegmentToAccess)
+      throws IOException {
+    List<Segment> toBeCleanedSegments = new ArrayList<>();
+    for (Segment filteredSegment : filteredSegmentToAccess) {
+      boolean refreshNeeded =
+          
DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
+              .isRefreshNeeded(filteredSegment,
+                  
updateStatusManager.getInvalidTimestampRange(filteredSegment.getSegmentNo()));
+      if (refreshNeeded) {
+        toBeCleanedSegments.add(filteredSegment);
+      }
+    }
+    // Clean segments if refresh is needed
+    for (Segment segment : filteredSegmentToAccess) {
+      if 
(DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
+          .isRefreshNeeded(segment.getSegmentNo())) {
+        toBeCleanedSegments.add(segment);
+      }
+    }
+    if (toBeCleanedSegments.size() > 0) {
+      DataMapStoreManager.getInstance()
+          .clearInvalidSegments(getOrCreateCarbonTable(job.getConfiguration()),
+              toBeCleanedSegments);
+    }
+  }
+
+  /**
    * Below method will be used to get the filter segments when query is fired 
on pre Aggregate
    * and main table in case of streaming.
    * For Pre Aggregate rules it will set all the valid segments for both 
streaming and

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e93d7b8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
index 68f8ca7..97bcb5f 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
@@ -149,7 +149,38 @@ class FlatFolderTableLoadingTestCase extends QueryTest 
with BeforeAndAfterAll {
     sql("clean files for table flatfolder_delete")
     
assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)).length
 == 1)
     
assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length
 == 0)
+    sql("drop table if exists flatfolder_delete")
+  }
 
+  test("merge index flat folder and delete delta issue with GLOBAL SORT") {
+    sql("drop table if exists flatfolder_delete")
+    sql(
+      """
+        | CREATE TABLE flatfolder_delete (empname String, designation String, 
doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int,empno int)
+        | STORED BY 'org.apache.carbondata.format' 
tblproperties('flat_folder'='true', 'SORT_SCOPE'='GLOBAL_SORT' )
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 
'GLOBAL_SORT_PARTITIONS'='4')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 
'GLOBAL_SORT_PARTITIONS'='4')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 
'GLOBAL_SORT_PARTITIONS'='4')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 
'GLOBAL_SORT_PARTITIONS'='4')""")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", 
"flatfolder_delete")
+    sql(s"""delete from flatfolder_delete where empname='anandh'""")
+    sql(s"""delete from flatfolder_delete where empname='arvind'""")
+    sql(s"""select * from flatfolder_delete""").show()
+    assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
+             
.filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length 
== 8)
+    sql("Alter table flatfolder_delete compact 'minor'")
+    assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
+             
.filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length 
== 8)
+    sql("clean files for table flatfolder_delete")
+    assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
+             
.filter(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)).length == 1)
+    assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
+             
.filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length 
== 0)
+    sql("drop table if exists flatfolder_delete")
   }
 
   override def afterAll = {

Reply via email to