Repository: carbondata
Updated Branches:
  refs/heads/master a06939b69 -> 41b007470


[CARBONDATA-2037]Store carbondata locations in datamap to make the datamap 
retrieval faster

Currently carbondata locations are getting from namenode for each query and 
that makes queries slower. So this PR stores the block locations while loading
datamap and retrieves from it.
1. Store carbondata locations in datamap to make the datamap retrieval faster.
2. Add method to convert unsafe to safe to avoid multiple calculations.

This closes #1810


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/41b00747
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/41b00747
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/41b00747

Branch: refs/heads/master
Commit: 41b0074705fae4ff11202caf4603451a89320024
Parents: a06939b
Author: ravipesala <[email protected]>
Authored: Tue Jan 16 13:37:50 2018 +0530
Committer: manishgupta88 <[email protected]>
Committed: Thu Jan 18 20:07:41 2018 +0530

----------------------------------------------------------------------
 .../filesystem/AbstractDFSCarbonFile.java       |  54 ++++++++++
 .../datastore/filesystem/AlluxioCarbonFile.java |  19 +---
 .../core/datastore/filesystem/CarbonFile.java   |  12 +++
 .../datastore/filesystem/HDFSCarbonFile.java    |  21 +---
 .../datastore/filesystem/LocalCarbonFile.java   |   8 ++
 .../datastore/filesystem/ViewFSCarbonFile.java  |  20 +---
 .../indexstore/BlockletDataMapIndexStore.java   |  70 +++++--------
 .../core/indexstore/ExtendedBlocklet.java       |  27 +----
 .../core/indexstore/UnsafeMemoryDMStore.java    |   2 +-
 .../blockletindex/BlockletDMComparator.java     |   2 +-
 .../blockletindex/BlockletDataMap.java          |  89 ++++++++++++-----
 .../blockletindex/BlockletDataMapModel.java     |  10 +-
 .../blockletindex/SegmentIndexFileStore.java    |  34 +++++++
 .../core/indexstore/row/DataMapRowImpl.java     |  13 +++
 .../core/indexstore/row/UnsafeDataMapRow.java   | 100 +++++++++++++++++++
 .../core/metadata/PartitionMapFileStore.java    |  38 +++++++
 .../hadoop/api/CarbonTableInputFormat.java      |  11 +-
 17 files changed, 368 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/41b00747/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index a8513cf..7b634d2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -24,6 +24,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -32,11 +34,14 @@ import 
org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -459,4 +464,53 @@ public abstract  class AbstractDFSCarbonFile implements 
CarbonFile {
   public void setPermission(String directoryPath, FsPermission permission, 
String username,
       String group) throws IOException {
   }
+
+  @Override
+  public CarbonFile[] listFiles() {
+    FileStatus[] listStatus = null;
+    try {
+      if (null != fileStatus && fileStatus.isDirectory()) {
+        Path path = fileStatus.getPath();
+        listStatus = 
path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
+      } else {
+        return new CarbonFile[0];
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception occured: " + e.getMessage());
+      return new CarbonFile[0];
+    }
+    return getFiles(listStatus);
+  }
+
+  @Override
+  public CarbonFile[] locationAwareListFiles() throws IOException {
+    if (null != fileStatus && fileStatus.isDirectory()) {
+      List<FileStatus> listStatus = new ArrayList<>();
+      Path path = fileStatus.getPath();
+      RemoteIterator<LocatedFileStatus> iter =
+          
path.getFileSystem(FileFactory.getConfiguration()).listLocatedStatus(path);
+      while (iter.hasNext()) {
+        listStatus.add(iter.next());
+      }
+      return getFiles(listStatus.toArray(new FileStatus[listStatus.size()]));
+    }
+    return new CarbonFile[0];
+  }
+
+  /**
+   * Get the CarbonFiles from filestatus array
+   */
+  protected abstract CarbonFile[] getFiles(FileStatus[] listStatus);
+
+  @Override public String[] getLocations() throws IOException {
+    BlockLocation[] blkLocations;
+    if (fileStatus instanceof LocatedFileStatus) {
+      blkLocations = ((LocatedFileStatus)fileStatus).getBlockLocations();
+    } else {
+      FileSystem fs = 
fileStatus.getPath().getFileSystem(FileFactory.getConfiguration());
+      blkLocations = fs.getFileBlockLocations(fileStatus.getPath(), 0L, 
fileStatus.getLen());
+    }
+
+    return blkLocations[0].getHosts();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41b00747/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
index c3ccd0c..e0df6ae 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
@@ -55,7 +55,8 @@ public class AlluxioCarbonFile extends AbstractDFSCarbonFile {
    * @param listStatus
    * @return
    */
-  private CarbonFile[] getFiles(FileStatus[] listStatus) {
+  @Override
+  protected CarbonFile[] getFiles(FileStatus[] listStatus) {
     if (listStatus == null) {
       return new CarbonFile[0];
     }
@@ -66,22 +67,6 @@ public class AlluxioCarbonFile extends AbstractDFSCarbonFile 
{
     return files;
   }
 
-  @Override
-  public CarbonFile[] listFiles() {
-    FileStatus[] listStatus = null;
-    try {
-      if (null != fileStatus && fileStatus.isDirectory()) {
-        Path path = fileStatus.getPath();
-        listStatus = 
path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
-      } else {
-        return new CarbonFile[0];
-      }
-    } catch (IOException e) {
-      LOGGER.error("Exception occured: " + e.getMessage());
-      return new CarbonFile[0];
-    }
-    return getFiles(listStatus);
-  }
 
   @Override
   public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41b00747/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
index 16cd0e0..5f172fc 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
@@ -34,6 +34,12 @@ public interface CarbonFile {
 
   CarbonFile[] listFiles();
 
+  /**
+   * It returns list of files with location details.
+   * @return
+   */
+  CarbonFile[] locationAwareListFiles() throws IOException;
+
   String getName();
 
   boolean isDirectory();
@@ -132,4 +138,10 @@ public interface CarbonFile {
   void setPermission(String directoryPath, FsPermission permission, String 
username, String group)
       throws IOException;
 
+  /**
+   * Returns locations of the file
+   * @return
+   */
+  String[] getLocations() throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41b00747/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
index a2fa6b6..d470b47 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
@@ -23,7 +23,6 @@ import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -62,7 +61,8 @@ public class HDFSCarbonFile extends AbstractDFSCarbonFile {
    * @param listStatus
    * @return
    */
-  private CarbonFile[] getFiles(FileStatus[] listStatus) {
+  @Override
+  protected CarbonFile[] getFiles(FileStatus[] listStatus) {
     if (listStatus == null) {
       return new CarbonFile[0];
     }
@@ -74,23 +74,6 @@ public class HDFSCarbonFile extends AbstractDFSCarbonFile {
   }
 
   @Override
-  public CarbonFile[] listFiles() {
-    FileStatus[] listStatus = null;
-    try {
-      if (null != fileStatus && fileStatus.isDirectory()) {
-        Path path = fileStatus.getPath();
-        listStatus = 
path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
-      } else {
-        return new CarbonFile[0];
-      }
-    } catch (IOException e) {
-      LOGGER.error("Exception occured: " + e.getMessage());
-      return new CarbonFile[0];
-    }
-    return getFiles(listStatus);
-  }
-
-  @Override
   public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
     CarbonFile[] files = listFiles();
     if (files != null && files.length >= 1) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41b00747/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index 39ca521..4ce78be 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -423,4 +423,12 @@ public class LocalCarbonFile implements CarbonFile {
   public void setPermission(String directoryPath, FsPermission permission, 
String username,
       String group) throws IOException {
   }
+
+  @Override public CarbonFile[] locationAwareListFiles() throws IOException {
+    return listFiles();
+  }
+
+  @Override public String[] getLocations() throws IOException {
+    return new String[]{"localhost"};
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41b00747/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java
index e05112d..6650b9c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java
@@ -52,7 +52,8 @@ public class ViewFSCarbonFile extends AbstractDFSCarbonFile {
    * @param listStatus
    * @return
    */
-  private CarbonFile[] getFiles(FileStatus[] listStatus) {
+  @Override
+  protected CarbonFile[] getFiles(FileStatus[] listStatus) {
     if (listStatus == null) {
       return new CarbonFile[0];
     }
@@ -64,23 +65,6 @@ public class ViewFSCarbonFile extends AbstractDFSCarbonFile {
   }
 
   @Override
-  public CarbonFile[] listFiles() {
-    FileStatus[] listStatus = null;
-    try {
-      if (null != fileStatus && fileStatus.isDirectory()) {
-        Path path = fileStatus.getPath();
-        listStatus = 
path.getFileSystem(FileFactory.getConfiguration()).listStatus(path);
-      } else {
-        return new CarbonFile[0];
-      }
-    } catch (IOException ex) {
-      LOGGER.error("Exception occured" + ex.getMessage());
-      return new CarbonFile[0];
-    }
-    return getFiles(listStatus);
-  }
-
-  @Override
   public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
     CarbonFile[] files = listFiles();
     if (files != null && files.length >= 1) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41b00747/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index f2beae7..8eae974 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -21,22 +21,20 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import 
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel;
 import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.PartitionMapFileStore;
-import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 /**
@@ -80,11 +78,17 @@ public class BlockletDataMapIndexStore
         String segmentPath = CarbonTablePath.getSegmentPath(
             identifier.getAbsoluteTableIdentifier().getTablePath(),
             identifier.getSegmentId());
+        Map<String, String[]> locationMap = new HashMap<>();
+        CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+        CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles();
         SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
-        indexFileStore.readAllIIndexOfSegment(segmentPath);
+        indexFileStore.readAllIIndexOfSegment(carbonFiles);
         PartitionMapFileStore partitionFileStore = new PartitionMapFileStore();
-        partitionFileStore.readAllPartitionsOfSegment(segmentPath);
-        dataMap = loadAndGetDataMap(identifier, indexFileStore, 
partitionFileStore);
+        partitionFileStore.readAllPartitionsOfSegment(carbonFiles, 
segmentPath);
+        for (CarbonFile file : carbonFiles) {
+          locationMap.put(file.getAbsolutePath(), file.getLocations());
+        }
+        dataMap = loadAndGetDataMap(identifier, indexFileStore, 
partitionFileStore, locationMap);
       } catch (MemoryException e) {
         LOGGER.error("memory exception when loading datamap: " + 
e.getMessage());
         throw new RuntimeException(e.getMessage(), e);
@@ -112,11 +116,8 @@ public class BlockletDataMapIndexStore
       if (missedIdentifiers.size() > 0) {
         Map<String, SegmentIndexFileStore> segmentIndexFileStoreMap = new 
HashMap<>();
         Map<String, PartitionMapFileStore> partitionFileStoreMap = new 
HashMap<>();
-        service =
-            Executors.newCachedThreadPool(
-                new CarbonThreadFactory("BlockletDataMapIndexStore:" + 
missedIdentifiers.get(0)
-                    .getAbsoluteTableIdentifier().getTableName()));
-        List<Future<BlockletDataMap>> futureList = new ArrayList<>();
+        Map<String, String[]> locationMap = new HashMap<>();
+
         for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) {
           SegmentIndexFileStore indexFileStore =
               segmentIndexFileStoreMap.get(identifier.getSegmentId());
@@ -126,21 +127,20 @@ public class BlockletDataMapIndexStore
               identifier.getAbsoluteTableIdentifier().getTablePath(),
               identifier.getSegmentId());
           if (indexFileStore == null) {
+            CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+            CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles();
             indexFileStore = new SegmentIndexFileStore();
-            indexFileStore.readAllIIndexOfSegment(segmentPath);
+            indexFileStore.readAllIIndexOfSegment(carbonFiles);
             segmentIndexFileStoreMap.put(identifier.getSegmentId(), 
indexFileStore);
-          }
-          if (partitionFileStore == null) {
             partitionFileStore = new PartitionMapFileStore();
-            partitionFileStore.readAllPartitionsOfSegment(segmentPath);
+            partitionFileStore.readAllPartitionsOfSegment(carbonFiles, 
segmentPath);
             partitionFileStoreMap.put(identifier.getSegmentId(), 
partitionFileStore);
+            for (CarbonFile file : carbonFiles) {
+              locationMap.put(file.getAbsolutePath(), file.getLocations());
+            }
           }
-          BlockletDataMapLoader blockletDataMapLoader =
-              new BlockletDataMapLoader(identifier, indexFileStore, 
partitionFileStore);
-          futureList.add(service.submit(blockletDataMapLoader));
-        }
-        for (Future<BlockletDataMap> dataMapFuture : futureList) {
-          blockletDataMaps.add(dataMapFuture.get());
+          blockletDataMaps.add(
+              loadAndGetDataMap(identifier, indexFileStore, 
partitionFileStore, locationMap));
         }
       }
     } catch (Throwable e) {
@@ -180,27 +180,6 @@ public class BlockletDataMapIndexStore
   }
 
   /**
-   * This class is used to parallelize reading of index files.
-   */
-  private class BlockletDataMapLoader implements Callable<BlockletDataMap> {
-
-    private TableBlockIndexUniqueIdentifier identifier;
-    private SegmentIndexFileStore indexFileStore;
-    private PartitionMapFileStore partitionFileStore;
-
-    public BlockletDataMapLoader(TableBlockIndexUniqueIdentifier identifier,
-        SegmentIndexFileStore indexFileStore, PartitionMapFileStore 
partitionFileStore) {
-      this.identifier = identifier;
-      this.indexFileStore = indexFileStore;
-      this.partitionFileStore = partitionFileStore;
-    }
-
-    @Override public BlockletDataMap call() throws Exception {
-      return loadAndGetDataMap(identifier, indexFileStore, partitionFileStore);
-    }
-  }
-
-  /**
    * Below method will be used to load the segment of segments
    * One segment may have multiple task , so  table segment will be loaded
    * based on task id and will return the map of taksId to table segment
@@ -212,7 +191,8 @@ public class BlockletDataMapIndexStore
   private BlockletDataMap loadAndGetDataMap(
       TableBlockIndexUniqueIdentifier identifier,
       SegmentIndexFileStore indexFileStore,
-      PartitionMapFileStore partitionFileStore)
+      PartitionMapFileStore partitionFileStore,
+      Map<String, String[]> locationMap)
       throws IOException, MemoryException {
     String uniqueTableSegmentIdentifier =
         identifier.getUniqueTableSegmentIdentifier();
@@ -226,7 +206,7 @@ public class BlockletDataMapIndexStore
       dataMap.init(new BlockletDataMapModel(identifier.getFilePath(),
           indexFileStore.getFileData(identifier.getCarbonIndexFileName()),
           
partitionFileStore.getPartitions(identifier.getCarbonIndexFileName()),
-          partitionFileStore.isPartionedSegment()));
+          partitionFileStore.isPartionedSegment(), locationMap));
       lruCache.put(identifier.getUniqueTableSegmentIdentifier(), dataMap,
           dataMap.getMemorySize());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41b00747/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index e0cfefb..d1bfa35 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -16,15 +16,6 @@
  */
 package org.apache.carbondata.core.indexstore;
 
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-
 /**
  * Detailed blocklet information
  */
@@ -50,25 +41,15 @@ public class ExtendedBlocklet extends Blocklet {
     this.detailInfo = detailInfo;
   }
 
-  /**
-   * It gets the hdfs block locations and length for this blocklet. It is used 
internally to get the
-   * locations for allocating tasks.
-   * @throws IOException
-   */
-  public void updateLocations() throws IOException {
-    Path path = new Path(getPath());
-    FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
-    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
-    LocatedFileStatus fileStatus = iter.next();
-    location = fileStatus.getBlockLocations()[0].getHosts();
-    length = fileStatus.getLen();
+  public void setLocation(String[] location) {
+    this.location = location;
   }
 
-  public String[] getLocations() throws IOException {
+  public String[] getLocations() {
     return location;
   }
 
-  public long getLength() throws IOException {
+  public long getLength() {
     return length;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41b00747/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 73b7b60..dc630ff 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -168,7 +168,7 @@ public class UnsafeMemoryDMStore {
     }
   }
 
-  public DataMapRow getUnsafeRow(int index) {
+  public UnsafeDataMapRow getUnsafeRow(int index) {
     assert (index < rowCount);
     return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41b00747/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
index 9a50600..fccbda8 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDMComparator.java
@@ -63,7 +63,7 @@ public class BlockletDMComparator implements 
Comparator<DataMapRow> {
     int compareResult = 0;
     int processedNoDictionaryColumn = numberOfNoDictSortColumns;
     byte[][] firstBytes = splitKey(first.getByteArray(0));
-    byte[][] secondBytes = splitKey(first.getByteArray(0));
+    byte[][] secondBytes = splitKey(second.getByteArray(0));
     byte[] firstNoDictionaryKeys = firstBytes[1];
     ByteBuffer firstNoDictionaryKeyBuffer = 
ByteBuffer.wrap(firstNoDictionaryKeys);
     byte[] secondNoDictionaryKeys = secondBytes[1];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41b00747/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 7c620d7..ee74fad 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -22,6 +22,7 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -65,6 +66,7 @@ import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
+import org.apache.commons.lang3.StringUtils;
 import org.xerial.snappy.Snappy;
 
 /**
@@ -97,6 +99,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   private static int BLOCK_FOOTER_OFFSET = 9;
 
+  private static int LOCATIONS = 10;
+
   private static int TASK_MIN_VALUES_INDEX = 0;
 
   private static int TASK_MAX_VALUES_INDEX = 1;
@@ -136,15 +140,23 @@ public class BlockletDataMap implements DataMap, 
Cacheable {
         createSummarySchema(segmentProperties, 
blockletDataMapInfo.getPartitions(), schemaBinary);
       }
       TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
-      if (fileFooter.getBlockletList() == null) {
-        // This is old store scenario, here blocklet information is not 
available in  index file so
-        // load only block info
-        summaryRow =
-            loadToUnsafeBlock(fileFooter, segmentProperties, 
blockInfo.getFilePath(), summaryRow);
-      } else {
-        // Here it loads info about all blocklets of index
-        summaryRow =
-            loadToUnsafe(fileFooter, segmentProperties, 
blockInfo.getFilePath(), summaryRow);
+      String[] locations = 
blockletDataMapInfo.getLocationMap().get(blockInfo.getFilePath());
+      // Here it loads info about all blocklets of index
+      // Only add if the file exists physically. There are scenarios which 
index file exists inside
+      // merge index but related carbondata files are deleted. In that case we 
first check whether
+      // the file exists physically or not
+      if (locations != null) {
+        if (fileFooter.getBlockletList() == null) {
+          // This is old store scenario, here blocklet information is not 
available in index file so
+          // load only block info
+          summaryRow =
+              loadToUnsafeBlock(fileFooter, segmentProperties, 
blockInfo.getFilePath(), summaryRow,
+                  locations);
+        } else {
+          summaryRow =
+              loadToUnsafe(fileFooter, segmentProperties, 
blockInfo.getFilePath(), summaryRow,
+                  locations);
+        }
       }
     }
     if (unsafeMemoryDMStore != null) {
@@ -163,7 +175,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
   }
 
   private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter,
-      SegmentProperties segmentProperties, String filePath, DataMapRowImpl 
summaryRow) {
+      SegmentProperties segmentProperties, String filePath, DataMapRowImpl 
summaryRow,
+      String[] locations) {
     int[] minMaxLen = segmentProperties.getColumnsValueSize();
     List<BlockletInfo> blockletList = fileFooter.getBlockletList();
     CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
@@ -221,7 +234,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
         serializedData = stream.toByteArray();
         row.setByteArray(serializedData, ordinal++);
         // Add block footer offset, it is used if we need to read footer of 
block
-        
row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), 
ordinal);
+        
row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), 
ordinal++);
+        setLocations(locations, row, ordinal);
+
         unsafeMemoryDMStore.addIndexRowToUnsafe(row);
       } catch (Exception e) {
         throw new RuntimeException(e);
@@ -231,13 +246,21 @@ public class BlockletDataMap implements DataMap, 
Cacheable {
     return summaryRow;
   }
 
+  private void setLocations(String[] locations, DataMapRow row, int ordinal)
+      throws UnsupportedEncodingException {
+    // Add location info
+    String locationStr = StringUtils.join(locations, ',');
+    
row.setByteArray(locationStr.getBytes(CarbonCommonConstants.DEFAULT_CHARSET), 
ordinal);
+  }
+
   /**
    * Load information for the block.It is the case can happen only for old 
stores
    * where blocklet information is not available in index file. So load only 
block information
    * and read blocklet information in executor.
    */
   private DataMapRowImpl loadToUnsafeBlock(DataFileFooter fileFooter,
-      SegmentProperties segmentProperties, String filePath, DataMapRowImpl 
summaryRow) {
+      SegmentProperties segmentProperties, String filePath, DataMapRowImpl 
summaryRow,
+      String[] locations) {
     int[] minMaxLen = segmentProperties.getColumnsValueSize();
     BlockletIndex blockletIndex = fileFooter.getBlockletIndex();
     CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
@@ -286,8 +309,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
     // add blocklet info
     row.setByteArray(new byte[0], ordinal++);
 
-    
row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), 
ordinal);
+    
row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), 
ordinal++);
     try {
+      setLocations(locations, row, ordinal);
       unsafeMemoryDMStore.addIndexRowToUnsafe(row);
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -484,6 +508,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
     // for block footer offset.
     indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
 
+    // for locations
+    indexSchemas.add(new 
CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY));
+
     unsafeMemoryDMStore =
         new UnsafeMemoryDMStore(indexSchemas.toArray(new 
CarbonRowSchema[indexSchemas.size()]));
   }
@@ -558,7 +585,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   @Override
   public List<Blocklet> prune(FilterResolverIntf filterExp) {
-
+    if (unsafeMemoryDMStore.getRowCount() == 0) {
+      return new ArrayList<>();
+    }
     // getting the start and end index key based on filter for hitting the
     // selected block reference nodes based on filter resolver tree.
     if (LOGGER.isDebugEnabled()) {
@@ -598,8 +627,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
     if (filterExp == null) {
       int rowCount = unsafeMemoryDMStore.getRowCount();
       for (int i = 0; i < rowCount; i++) {
-        DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(i);
-        blocklets.add(createBlocklet(unsafeRow, i));
+        DataMapRow safeRow = 
unsafeMemoryDMStore.getUnsafeRow(i).convertToSafeRow();
+        blocklets.add(createBlocklet(safeRow, i));
       }
     } else {
       int startIndex = findStartIndex(convertToRow(searchStartKey), 
comparator);
@@ -607,14 +636,14 @@ public class BlockletDataMap implements DataMap, 
Cacheable {
       FilterExecuter filterExecuter =
           FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
       while (startIndex <= endIndex) {
-        DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(startIndex);
-        String filePath = new String(unsafeRow.getByteArray(FILE_PATH_INDEX),
+        DataMapRow safeRow = 
unsafeMemoryDMStore.getUnsafeRow(startIndex).convertToSafeRow();
+        String filePath = new String(safeRow.getByteArray(FILE_PATH_INDEX),
             CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
         boolean isValid =
-            addBlockBasedOnMinMaxValue(filterExecuter, 
getMinMaxValue(unsafeRow, MAX_VALUES_INDEX),
-                getMinMaxValue(unsafeRow, MIN_VALUES_INDEX), filePath, 
startIndex);
+            addBlockBasedOnMinMaxValue(filterExecuter, getMinMaxValue(safeRow, 
MAX_VALUES_INDEX),
+                getMinMaxValue(safeRow, MIN_VALUES_INDEX), filePath, 
startIndex);
         if (isValid) {
-          blocklets.add(createBlocklet(unsafeRow, startIndex));
+          blocklets.add(createBlocklet(safeRow, startIndex));
         }
         startIndex++;
       }
@@ -624,6 +653,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
   }
 
   @Override public List<Blocklet> prune(FilterResolverIntf filterExp, 
List<String> partitions) {
+    if (unsafeMemoryDMStore.getRowCount() == 0) {
+      return new ArrayList<>();
+    }
     // First get the partitions which are stored inside datamap.
     List<String> storedPartitions = getPartitions();
     // if it has partitioned datamap but there is no partitioned information 
stored, it means
@@ -675,8 +707,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   public ExtendedBlocklet getDetailedBlocklet(String blockletId) {
     int index = Integer.parseInt(blockletId);
-    DataMapRow unsafeRow = unsafeMemoryDMStore.getUnsafeRow(index);
-    return createBlocklet(unsafeRow, index);
+    DataMapRow safeRow = 
unsafeMemoryDMStore.getUnsafeRow(index).convertToSafeRow();
+    return createBlocklet(safeRow, index);
   }
 
   private byte[][] getMinMaxValue(DataMapRow row, int index) {
@@ -701,16 +733,19 @@ public class BlockletDataMap implements DataMap, 
Cacheable {
     
detailInfo.setSchemaUpdatedTimeStamp(row.getLong(SCHEMA_UPADATED_TIME_INDEX));
     byte[] byteArray = row.getByteArray(BLOCK_INFO_INDEX);
     BlockletInfo blockletInfo = null;
-    if (byteArray.length > 0) {
-      try {
+    try {
+      if (byteArray.length > 0) {
         blockletInfo = new BlockletInfo();
         ByteArrayInputStream stream = new ByteArrayInputStream(byteArray);
         DataInputStream inputStream = new DataInputStream(stream);
         blockletInfo.readFields(inputStream);
         inputStream.close();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
       }
+      blocklet.setLocation(
+          new String(row.getByteArray(LOCATIONS), 
CarbonCommonConstants.DEFAULT_CHARSET)
+              .split(","));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
     detailInfo.setBlockletInfo(blockletInfo);
     blocklet.setDetailInfo(detailInfo);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41b00747/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
index 704e0f7..85293a1 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
@@ -17,6 +17,7 @@
 package org.apache.carbondata.core.indexstore.blockletindex;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 
@@ -31,12 +32,15 @@ public class BlockletDataMapModel extends DataMapModel {
 
   private boolean partitionedSegment;
 
+  private Map<String, String[]> locationMap;
+
   public BlockletDataMapModel(String filePath, byte[] fileData, List<String> 
partitions,
-      boolean partitionedSegment) {
+      boolean partitionedSegment, Map<String, String[]> locationMap) {
     super(filePath);
     this.fileData = fileData;
     this.partitions = partitions;
     this.partitionedSegment = partitionedSegment;
+    this.locationMap = locationMap;
   }
 
   public byte[] getFileData() {
@@ -50,4 +54,8 @@ public class BlockletDataMapModel extends DataMapModel {
   public boolean isPartitionedSegment() {
     return partitionedSegment;
   }
+
+  public Map<String, String[]> getLocationMap() {
+    return locationMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41b00747/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 9603090..a30b04c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -97,6 +97,23 @@ public class SegmentIndexFileStore {
   }
 
   /**
+   * Read all index files and keep the cache in it.
+   *
+   * @param carbonFiles
+   * @throws IOException
+   */
+  public void readAllIIndexOfSegment(CarbonFile[] carbonFiles) throws 
IOException {
+    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(carbonFiles);
+    for (int i = 0; i < carbonIndexFiles.length; i++) {
+      if 
(carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        readMergeFile(carbonIndexFiles[i].getCanonicalPath());
+      } else if 
(carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+        readIndexFile(carbonIndexFiles[i]);
+      }
+    }
+  }
+
+  /**
    * Read all index file names of the segment
    *
    * @param segmentPath
@@ -211,6 +228,23 @@ public class SegmentIndexFileStore {
   }
 
   /**
+   * List all the index files of the segment.
+   *
+   * @param carbonFiles
+   * @return
+   */
+  public static CarbonFile[] getCarbonIndexFiles(CarbonFile[] carbonFiles) {
+    List<CarbonFile> indexFiles = new ArrayList<>();
+    for (CarbonFile file: carbonFiles) {
+      if (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
+          file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        indexFiles.add(file);
+      }
+    }
+    return indexFiles.toArray(new CarbonFile[indexFiles.size()]);
+  }
+
+  /**
    * Return the map that contain index file name and content of the file.
    *
    * @return

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41b00747/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
index 0bb4a5c..39536f5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
@@ -26,6 +26,8 @@ public class DataMapRowImpl extends DataMapRow {
 
   private Object[] data;
 
+  private int totalLengthInBytes;
+
   public DataMapRowImpl(CarbonRowSchema[] schemas) {
     super(schemas);
     this.data = new Object[schemas.length];
@@ -107,4 +109,15 @@ public class DataMapRowImpl extends DataMapRow {
     return (Double) data[ordinal];
   }
 
+  public void setTotalLengthInBytes(int totalLengthInBytes) {
+    this.totalLengthInBytes = totalLengthInBytes;
+  }
+
+  @Override public int getTotalSizeInBytes() {
+    if (totalLengthInBytes > 0) {
+      return totalLengthInBytes;
+    } else {
+      return super.getTotalSizeInBytes();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41b00747/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
index 932865d..1b95984 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.indexstore.row;
 
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 
 import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
 import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
@@ -151,6 +153,104 @@ public class UnsafeDataMapRow extends DataMapRow {
     throw new UnsupportedOperationException("Not supported to set on unsafe 
row");
   }
 
+  /**
+   * Convert unsafe to safe row.
+   *
+   * @return
+   */
+  public DataMapRow convertToSafeRow() {
+    DataMapRowImpl row = new DataMapRowImpl(schemas);
+    int runningLength = 0;
+    for (int i = 0; i < schemas.length; i++) {
+      CarbonRowSchema schema = schemas[i];
+      switch (schema.getSchemaType()) {
+        case FIXED:
+          DataType dataType = schema.getDataType();
+          if (dataType == DataTypes.BYTE) {
+            row.setByte(
+                getUnsafe().getByte(
+                    block.getBaseObject(),
+                    block.getBaseOffset() + pointer + runningLength),
+                i);
+            runningLength += schema.getLength();
+          } else if (dataType == DataTypes.SHORT) {
+            row.setShort(
+                getUnsafe().getShort(
+                    block.getBaseObject(),
+                    block.getBaseOffset() + pointer + runningLength),
+                i);
+            runningLength += schema.getLength();
+          } else if (dataType == DataTypes.INT) {
+            row.setInt(
+                getUnsafe().getInt(
+                    block.getBaseObject(),
+                    block.getBaseOffset() + pointer + runningLength),
+                i);
+            runningLength += schema.getLength();
+          } else if (dataType == DataTypes.LONG) {
+            row.setLong(
+                getUnsafe().getLong(
+                    block.getBaseObject(),
+                    block.getBaseOffset() + pointer + runningLength),
+                i);
+            runningLength += schema.getLength();
+          } else if (dataType == DataTypes.FLOAT) {
+            row.setFloat(
+                getUnsafe().getFloat(block.getBaseObject(),
+                    block.getBaseOffset() + pointer + runningLength),
+                i);
+            runningLength += schema.getLength();
+          } else if (dataType == DataTypes.DOUBLE) {
+            row.setDouble(
+                getUnsafe().getDouble(block.getBaseObject(),
+                    block.getBaseOffset() + pointer + runningLength),
+                i);
+            runningLength += schema.getLength();
+          } else if (dataType == DataTypes.BYTE_ARRAY) {
+            byte[] data = new byte[schema.getLength()];
+            getUnsafe().copyMemory(
+                block.getBaseObject(),
+                block.getBaseOffset() + pointer + runningLength,
+                    data,
+                BYTE_ARRAY_OFFSET,
+                data.length);
+            row.setByteArray(data, i);
+            runningLength += data.length;
+          } else {
+            throw new UnsupportedOperationException(
+                "unsupported data type for unsafe storage: " + 
schema.getDataType());
+          }
+          break;
+        case VARIABLE:
+          short length = getUnsafe().getShort(
+              block.getBaseObject(),
+              block.getBaseOffset() + pointer + runningLength);
+          runningLength += 2;
+          byte[] data = new byte[length];
+          getUnsafe().copyMemory(
+              block.getBaseObject(),
+              block.getBaseOffset() + pointer + runningLength,
+                  data,
+              BYTE_ARRAY_OFFSET,
+              data.length);
+          runningLength += data.length;
+          row.setByteArray(data, i);
+          break;
+        case STRUCT:
+          DataMapRow structRow = ((UnsafeDataMapRow) 
getRow(i)).convertToSafeRow();
+          row.setRow(structRow, i);
+          runningLength += structRow.getTotalSizeInBytes();
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              "unsupported data type for unsafe storage: " + 
schema.getDataType());
+      }
+    }
+    row.setTotalLengthInBytes(runningLength);
+
+    return row;
+  }
+
   private int getSizeInBytes(int ordinal, int position) {
     switch (schemas[ordinal].getSchemaType()) {
       case FIXED:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41b00747/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
index 3068cd9..355d083 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
@@ -170,6 +170,30 @@ public class PartitionMapFileStore {
     return null;
   }
 
+  private String getPartitionFilePath(CarbonFile[] carbonFiles, String 
segmentPath) {
+
+    List<CarbonFile> partitionFiles = new ArrayList<>();
+    for (CarbonFile file : carbonFiles) {
+      if (file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT)) {
+        partitionFiles.add(file);
+      }
+    }
+    if (partitionFiles.size() > 0) {
+      partionedSegment = true;
+      int i = 0;
+      // Get the latest partition map file based on the timestamp of that file.
+      long[] partitionTimestamps = new long[partitionFiles.size()];
+      for (CarbonFile file : partitionFiles) {
+        partitionTimestamps[i++] = Long.parseLong(file.getName()
+            .substring(0, file.getName().length() - 
CarbonTablePath.PARTITION_MAP_EXT.length()));
+      }
+      Arrays.sort(partitionTimestamps);
+      return segmentPath + "/" + 
partitionTimestamps[partitionTimestamps.length - 1]
+          + CarbonTablePath.PARTITION_MAP_EXT;
+    }
+    return null;
+  }
+
   private CarbonFile[] getPartitionFiles(String segmentPath) {
     CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
     if (carbonFile.exists()) {
@@ -228,6 +252,20 @@ public class PartitionMapFileStore {
     }
   }
 
+  /**
+   * Reads all partitions which existed inside the passed segment path
+   * @param carbonFiles
+   */
+  public void readAllPartitionsOfSegment(CarbonFile[] carbonFiles, String 
segmentPath)
+      throws IOException {
+    String partitionFilePath = getPartitionFilePath(carbonFiles, segmentPath);
+    if (partitionFilePath != null) {
+      partionedSegment = true;
+      PartitionMapper partitionMapper = readPartitionMap(partitionFilePath);
+      partitionMap.putAll(partitionMapper.getPartitionMap());
+    }
+  }
+
   public boolean isPartionedSegment() {
     return partionedSegment;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/41b00747/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 36c5f57..a1887f0 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.hadoop.api;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.util.ArrayList;
@@ -780,15 +779,7 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     return resultFilterredBlocks;
   }
 
-  private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet)
-      throws IOException {
-    try {
-      blocklet.updateLocations();
-    } catch (FileNotFoundException e) {
-      // In case of clean files there is a chance of carbondata file is 
deleted but index file
-      // exist inside merged file. So just return null.
-      return null;
-    }
+  private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet 
blocklet) throws IOException {
     org.apache.carbondata.hadoop.CarbonInputSplit split =
         
org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(),
             blocklet.getBlockletId(), new FileSplit(new 
Path(blocklet.getPath()), 0,

Reply via email to