Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 cc2a74e65 -> 4544765a4


[CARBONDATA-2321] Fix selection of partition column after concurrent load ails 
randomly

Fix selection of partition column after concurrent load fails randomly

This closes #2155


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4544765a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4544765a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4544765a

Branch: refs/heads/branch-1.3
Commit: 4544765a486b3fb1fbd4513992fbd79d3f9565ea
Parents: cc2a74e
Author: Jatin <jatin.de...@knoldus.in>
Authored: Wed Apr 11 12:04:31 2018 +0530
Committer: manishgupta88 <tomanishgupt...@gmail.com>
Committed: Wed Apr 11 14:13:54 2018 +0530

----------------------------------------------------------------------
 .../core/writer/CarbonIndexFileMergeWriter.java   | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4544765a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
 
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index bc150e5..e618f7f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -71,9 +71,9 @@ public class CarbonIndexFileMergeWriter {
     if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != 
null) {
       if (sfs == null) {
         return mergeNormalSegment(indexFileNamesTobeAdded, 
readFileFooterFromCarbonDataFile,
-            segmentPath, indexFiles);
+            segmentPath, indexFiles, segmentId);
       } else {
-        return mergePartitionSegment(indexFileNamesTobeAdded, sfs, indexFiles);
+        return mergePartitionSegment(indexFileNamesTobeAdded, sfs, indexFiles, 
segmentId);
       }
     }
     return null;
@@ -81,7 +81,8 @@ public class CarbonIndexFileMergeWriter {
 
 
   private SegmentIndexFIleMergeStatus mergeNormalSegment(List<String> 
indexFileNamesTobeAdded,
-      boolean readFileFooterFromCarbonDataFile, String segmentPath, 
CarbonFile[] indexFiles)
+      boolean readFileFooterFromCarbonDataFile, String segmentPath, 
CarbonFile[] indexFiles,
+      String segmentId)
       throws IOException {
     SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
     if (readFileFooterFromCarbonDataFile) {
@@ -93,7 +94,7 @@ public class CarbonIndexFileMergeWriter {
       fileStore.readAllIIndexOfSegment(segmentPath);
     }
     Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap();
-    writeMergeIndexFile(indexFileNamesTobeAdded, segmentPath, indexMap);
+    writeMergeIndexFile(indexFileNamesTobeAdded, segmentPath, indexMap, 
segmentId);
     for (CarbonFile indexFile : indexFiles) {
       indexFile.delete();
     }
@@ -101,7 +102,7 @@ public class CarbonIndexFileMergeWriter {
   }
 
   private SegmentIndexFIleMergeStatus mergePartitionSegment(List<String> 
indexFileNamesTobeAdded,
-      SegmentFileStore sfs, CarbonFile[] indexFiles) throws IOException {
+      SegmentFileStore sfs, CarbonFile[] indexFiles, String segmentId) throws 
IOException {
     SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
     fileStore
         .readAllIIndexOfSegment(sfs.getSegmentFile(), sfs.getTablePath(), 
SegmentStatus.SUCCESS,
@@ -119,7 +120,7 @@ public class CarbonIndexFileMergeWriter {
     }
     for (Map.Entry<String, Map<String, byte[]>> entry : 
indexLocationMap.entrySet()) {
       String mergeIndexFile =
-          writeMergeIndexFile(indexFileNamesTobeAdded, entry.getKey(), 
entry.getValue());
+          writeMergeIndexFile(indexFileNamesTobeAdded, entry.getKey(), 
entry.getValue(), segmentId);
       for (Map.Entry<String, SegmentFileStore.FolderDetails> segentry : 
sfs.getLocationMap()
           .entrySet()) {
         String location = segentry.getKey();
@@ -141,7 +142,7 @@ public class CarbonIndexFileMergeWriter {
   }
 
   private String writeMergeIndexFile(List<String> indexFileNamesTobeAdded, 
String segmentPath,
-      Map<String, byte[]> indexMap) throws IOException {
+      Map<String, byte[]> indexMap, String segmentId) throws IOException {
     MergedBlockIndexHeader indexHeader = new MergedBlockIndexHeader();
     MergedBlockIndex mergedBlockIndex = new MergedBlockIndex();
     List<String> fileNames = new ArrayList<>(indexMap.size());
@@ -154,7 +155,8 @@ public class CarbonIndexFileMergeWriter {
       }
     }
     if (fileNames.size() > 0) {
-      String mergeIndexName = System.currentTimeMillis() + 
CarbonTablePath.MERGE_INDEX_FILE_EXT;
+      String mergeIndexName =
+          segmentId + '_' + System.currentTimeMillis() + 
CarbonTablePath.MERGE_INDEX_FILE_EXT;
       openThriftWriter(segmentPath + "/" + mergeIndexName);
       indexHeader.setFile_names(fileNames);
       mergedBlockIndex.setFileData(data);

Reply via email to