Repository: carbondata
Updated Branches:
  refs/heads/master a7926ea13 -> 531ecdf3f


http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 6803fc8..c6efd77 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -70,9 +70,15 @@ public class SegmentIndexFileStore {
    */
   private Map<String, byte[]> carbonIndexMapWithFullPath;
 
+  /**
+   * Stores the list of index files in a merge file
+   */
+  private Map<String, List<String>> carbonMergeFileToIndexFilesMap;
+
   public SegmentIndexFileStore() {
     carbonIndexMap = new HashMap<>();
     carbonIndexMapWithFullPath = new HashMap<>();
+    carbonMergeFileToIndexFilesMap = new HashMap<>();
   }
 
   /**
@@ -201,6 +207,28 @@ public class SegmentIndexFileStore {
   }
 
   /**
+   * Read all index file names of the segment
+   *
+   * @param segmentPath
+   * @return
+   * @throws IOException
+   */
+  public Map<String, String> getMergeOrIndexFilesFromSegment(String 
segmentPath)
+      throws IOException {
+    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
+    Map<String, String> indexFiles = new HashMap<>();
+    for (int i = 0; i < carbonIndexFiles.length; i++) {
+      if 
(carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        indexFiles
+            .put(carbonIndexFiles[i].getAbsolutePath(), 
carbonIndexFiles[i].getAbsolutePath());
+      } else if 
(carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+        indexFiles.put(carbonIndexFiles[i].getAbsolutePath(), null);
+      }
+    }
+    return indexFiles;
+  }
+
+  /**
    * List all the index files inside merge file.
    * @param mergeFile
    * @return
@@ -221,13 +249,14 @@ public class SegmentIndexFileStore {
    * @param mergeFilePath
    * @throws IOException
    */
-  private void readMergeFile(String mergeFilePath) throws IOException {
+  public void readMergeFile(String mergeFilePath) throws IOException {
     ThriftReader thriftReader = new ThriftReader(mergeFilePath);
     try {
       thriftReader.open();
       MergedBlockIndexHeader indexHeader = 
readMergeBlockIndexHeader(thriftReader);
       MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader);
       List<String> file_names = indexHeader.getFile_names();
+      carbonMergeFileToIndexFilesMap.put(mergeFilePath, file_names);
       List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
       CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath);
       assert (file_names.size() == fileData.size());
@@ -298,8 +327,8 @@ public class SegmentIndexFileStore {
     CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
     return carbonFile.listFiles(new CarbonFileFilter() {
       @Override public boolean accept(CarbonFile file) {
-        return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || 
file.getName()
-            .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT);
+        return ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || 
file.getName()
+            .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() 
> 0);
       }
     });
   }
@@ -428,4 +457,8 @@ public class SegmentIndexFileStore {
             + " is " + (System.currentTimeMillis() - startTime));
     return carbondataFileFooter.getBlockletList();
   }
+
+  public Map<String, List<String>> getCarbonMergeFileToIndexFilesMap() {
+    return carbonMergeFileToIndexFilesMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java 
b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
index b764bdf..496a1d0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -16,13 +16,15 @@
  */
 package org.apache.carbondata.core.indexstore.row;
 
+import java.io.Serializable;
+
 import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 
 /**
  * It is just a normal row to store data. Implementation classes could be safe 
and unsafe.
  * TODO move this class a global row and use across loading after DataType is 
changed class
  */
-public abstract class DataMapRow {
+public abstract class DataMapRow implements Serializable {
 
   protected CarbonRowSchema[] schemas;
 
@@ -88,4 +90,13 @@ public abstract class DataMapRow {
   public int getColumnCount() {
     return schemas.length;
   }
+
+  /**
+   * default implementation
+   *
+   * @return
+   */
+  public DataMapRow convertToSafeRow() {
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
index 1b95984..1c1ecad 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -30,7 +30,12 @@ import static 
org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
  */
 public class UnsafeDataMapRow extends DataMapRow {
 
-  private MemoryBlock block;
+  private static final long serialVersionUID = -1156704133552046321L;
+
+  // As it is an unsafe memory block it is not recommended to serialize.
+  // If at all required to be serialized then override writeObject methods
+  // to which should take care of clearing the unsafe memory post serialization
+  private transient MemoryBlock block;
 
   private int pointer;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
index 813be4a..1a77467 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
@@ -16,12 +16,16 @@
  */
 package org.apache.carbondata.core.indexstore.schema;
 
+import java.io.Serializable;
+
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
  * It just have 2 types right now, either fixed or variable.
  */
-public abstract class CarbonRowSchema {
+public abstract class CarbonRowSchema implements Serializable {
+
+  private static final long serialVersionUID = -8061282029097686495L;
 
   protected DataType dataType;
 
@@ -29,6 +33,10 @@ public abstract class CarbonRowSchema {
     this.dataType = dataType;
   }
 
+  public void setDataType(DataType dataType) {
+    this.dataType = dataType;
+  }
+
   /**
    * Either fixed or variable length.
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index dff496b..9326b1d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -492,6 +492,35 @@ public class SegmentFileStore {
   }
 
   /**
+   * Gets all index files from this segment
+   * @return
+   */
+  public Map<String, String> getIndexOrMergeFiles() {
+    Map<String, String> indexFiles = new HashMap<>();
+    if (segmentFile != null) {
+      for (Map.Entry<String, FolderDetails> entry : 
getLocationMap().entrySet()) {
+        String location = entry.getKey();
+        if (entry.getValue().isRelative) {
+          location = tablePath + location;
+        }
+        if 
(entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
+          String mergeFileName = entry.getValue().getMergeFileName();
+          if (null != mergeFileName) {
+            indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + 
mergeFileName,
+                entry.getValue().mergeFileName);
+          } else {
+            for (String indexFile : entry.getValue().getFiles()) {
+              indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + 
indexFile,
+                  entry.getValue().mergeFileName);
+            }
+          }
+        }
+      }
+    }
+    return indexFiles;
+  }
+
+  /**
    * Gets all carbon index files from this segment
    * @return
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index c8ac15a..c7bcf2e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -97,6 +97,11 @@ public class TableInfo implements Serializable, Writable {
 
   private List<RelationIdentifier> parentRelationIdentifiers;
 
+  /**
+   * flag to check whether any schema modification operation has happened 
after creation of table
+   */
+  private boolean isSchemaModified;
+
   public TableInfo() {
     dataMapSchemaList = new ArrayList<>();
     isTransactionalTable = true;
@@ -115,6 +120,18 @@ public class TableInfo implements Serializable, Writable {
   public void setFactTable(TableSchema factTable) {
     this.factTable = factTable;
     updateParentRelationIdentifier();
+    updateIsSchemaModified();
+  }
+
+  private void updateIsSchemaModified() {
+    if (null != factTable.getSchemaEvalution()) {
+      // If schema evolution entry list size is > 1 that means an alter 
operation is performed
+      // which has added the new schema entry in the schema evolution list.
+      // Currently apart from create table schema evolution entries
+      // are getting added only in the alter operations.
+      isSchemaModified =
+          factTable.getSchemaEvalution().getSchemaEvolutionEntryList().size() 
> 1 ? true : false;
+    }
   }
 
   private void updateParentRelationIdentifier() {
@@ -273,6 +290,7 @@ public class TableInfo implements Serializable, Writable {
         parentRelationIdentifiers.get(i).write(out);
       }
     }
+    out.writeBoolean(isSchemaModified);
   }
 
   @Override public void readFields(DataInput in) throws IOException {
@@ -308,6 +326,7 @@ public class TableInfo implements Serializable, Writable {
         this.parentRelationIdentifiers.add(relationIdentifier);
       }
     }
+    this.isSchemaModified = in.readBoolean();
   }
 
   public AbsoluteTableIdentifier getOrCreateAbsoluteTableIdentifier() {
@@ -343,4 +362,9 @@ public class TableInfo implements Serializable, Writable {
   public void setTransactionalTable(boolean transactionalTable) {
     isTransactionalTable = transactionalTable;
   }
+
+  public boolean isSchemaModified() {
+    return isSchemaModified;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index bc4a90d..41ce31c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -68,11 +68,11 @@ public class TableStatusReadCommittedScope implements 
ReadCommittedScope {
     if (segment.getSegmentFileName() == null) {
       String path =
           CarbonTablePath.getSegmentPath(identifier.getTablePath(), 
segment.getSegmentNo());
-      indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
+      indexFiles = new 
SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path);
     } else {
       SegmentFileStore fileStore =
           new SegmentFileStore(identifier.getTablePath(), 
segment.getSegmentFileName());
-      indexFiles = fileStore.getIndexFiles();
+      indexFiles = fileStore.getIndexOrMergeFiles();
     }
     return indexFiles;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
new file mode 100644
index 0000000..0d28b9f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.BlockMetaInfo;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import 
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable;
+import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+public class BlockletDataMapUtil {
+
+  public static Map<String, BlockMetaInfo> getBlockMetaInfoMap(
+      TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore 
indexFileStore,
+      Set<String> filesRead, Map<String, BlockMetaInfo> 
fileNameToMetaInfoMapping)
+      throws IOException {
+    if (identifier.getMergeIndexFileName() != null
+        && indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
+      CarbonFile indexMergeFile = FileFactory.getCarbonFile(
+          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR 
+ identifier
+              .getMergeIndexFileName());
+      if (indexMergeFile.exists() && 
!filesRead.contains(indexMergeFile.getPath())) {
+        indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { 
indexMergeFile });
+        filesRead.add(indexMergeFile.getPath());
+      }
+    }
+    if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
+      indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { 
FileFactory.getCarbonFile(
+          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR 
+ identifier
+              .getIndexFileName()) });
+    }
+    DataFileFooterConverter fileFooterConverter = new 
DataFileFooterConverter();
+    Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
+    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
+        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + 
identifier
+            .getIndexFileName(), 
indexFileStore.getFileData(identifier.getIndexFileName()));
+    for (DataFileFooter footer : indexInfo) {
+      String blockPath = 
footer.getBlockInfo().getTableBlockInfo().getFilePath();
+      if (null == blockMetaInfoMap.get(blockPath)) {
+        blockMetaInfoMap.put(blockPath, 
createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath));
+      }
+    }
+    return blockMetaInfoMap;
+  }
+
+  /**
+   * This method will create file name to block Meta Info Mapping. This method 
will reduce the
+   * number of namenode calls and using this method one namenode will fetch 
1000 entries
+   *
+   * @param segmentFilePath
+   * @return
+   * @throws IOException
+   */
+  public static Map<String, BlockMetaInfo> 
createCarbonDataFileBlockMetaInfoMapping(
+      String segmentFilePath) throws IOException {
+    Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap();
+    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath);
+    if (carbonFile instanceof AbstractDFSCarbonFile) {
+      PathFilter pathFilter = new PathFilter() {
+        @Override public boolean accept(Path path) {
+          return CarbonTablePath.isCarbonDataFile(path.getName());
+        }
+      };
+      CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles(pathFilter);
+      for (CarbonFile file : carbonFiles) {
+        String[] location = file.getLocations();
+        long len = file.getSize();
+        BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len);
+        fileNameToMetaInfoMapping.put(file.getPath().toString(), 
blockMetaInfo);
+      }
+    }
+    return fileNameToMetaInfoMapping;
+  }
+
+  private static BlockMetaInfo createBlockMetaInfo(
+      Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, String 
carbonDataFile) {
+    FileFactory.FileType fileType = FileFactory.getFileType(carbonDataFile);
+    switch (fileType) {
+      case LOCAL:
+        CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile, 
fileType);
+        return new BlockMetaInfo(new String[] { "localhost" }, 
carbonFile.getSize());
+      default:
+        return fileNameToMetaInfoMapping.get(carbonDataFile);
+    }
+  }
+
+  public static Set<TableBlockIndexUniqueIdentifier> 
getTableBlockUniqueIdentifiers(Segment segment)
+      throws IOException {
+    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = 
new HashSet<>();
+    Map<String, String> indexFiles = segment.getCommittedIndexFile();
+    for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
+      Path indexFile = new Path(indexFileEntry.getKey());
+      tableBlockIndexUniqueIdentifiers.add(
+          new 
TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(), 
indexFile.getName(),
+              indexFileEntry.getValue(), segment.getSegmentNo()));
+    }
+    return tableBlockIndexUniqueIdentifiers;
+  }
+
+  /**
+   * This method will filter out the TableBlockIndexUniqueIdentifier belongs 
to that distributable
+   *
+   * @param tableBlockIndexUniqueIdentifiers
+   * @param distributable
+   * @return
+   */
+  public static TableBlockIndexUniqueIdentifier 
filterIdentifiersBasedOnDistributable(
+      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers,
+      BlockletDataMapDistributable distributable) {
+    TableBlockIndexUniqueIdentifier validIdentifier = null;
+    String fileName = 
CarbonTablePath.DataFileUtil.getFileName(distributable.getFilePath());
+    for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
+        tableBlockIndexUniqueIdentifiers) {
+      if (fileName.equals(tableBlockIndexUniqueIdentifier.getIndexFileName())) 
{
+        validIdentifier = tableBlockIndexUniqueIdentifier;
+        break;
+      }
+    }
+    return validIdentifier;
+  }
+
+  /**
+   * This method will the index files tableBlockIndexUniqueIdentifiers of a 
merge index file
+   *
+   * @param identifier
+   * @return
+   * @throws IOException
+   */
+  public static List<TableBlockIndexUniqueIdentifier> 
getIndexFileIdentifiersFromMergeFile(
+      TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore 
segmentIndexFileStore)
+      throws IOException {
+    List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = 
new ArrayList<>();
+    String mergeFilePath =
+        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + 
identifier
+            .getIndexFileName();
+    segmentIndexFileStore.readMergeFile(mergeFilePath);
+    List<String> indexFiles =
+        
segmentIndexFileStore.getCarbonMergeFileToIndexFilesMap().get(mergeFilePath);
+    for (String indexFile : indexFiles) {
+      tableBlockIndexUniqueIdentifiers.add(
+          new TableBlockIndexUniqueIdentifier(identifier.getIndexFilePath(), 
indexFile,
+              identifier.getIndexFileName(), identifier.getSegmentId()));
+    }
+    return tableBlockIndexUniqueIdentifiers;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java 
b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 8544da9..3823aef 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -219,6 +219,11 @@ public class SessionParams implements Serializable, 
Cloneable {
             throw new InvalidConfigurationException(
                 String.format("Invalid configuration of %s, datamap does not 
exist", key));
           }
+        } else if 
(key.startsWith(CarbonCommonConstants.CARBON_LOAD_DATAMAPS_PARALLEL)) {
+          isValid = CarbonUtil.validateBoolean(value);
+          if (!isValid) {
+            throw new InvalidConfigurationException("Invalid value " + value + 
" for key " + key);
+          }
         } else {
           throw new InvalidConfigurationException(
               "The key " + key + " not supported for dynamic configuration.");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index b9f4838..62192ff 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -533,7 +533,7 @@ public class CarbonTablePath {
     /**
      * Return the file name from file path
      */
-    private static String getFileName(String dataFilePath) {
+    public static String getFileName(String dataFilePath) {
       int endIndex = 
dataFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR);
       if (endIndex > -1) {
         return dataFilePath.substring(endIndex + 1, dataFilePath.length());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
 
b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
new file mode 100644
index 0000000..dfbdd29
--- /dev/null
+++ 
b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore.blockletindex;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+
+import mockit.Deencapsulation;
+import mockit.Mock;
+import mockit.MockUp;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestBlockletDataMapFactory {
+
+  private CarbonTable carbonTable;
+
+  private AbsoluteTableIdentifier absoluteTableIdentifier;
+
+  private TableInfo tableInfo;
+
+  private BlockletDataMapFactory blockletDataMapFactory;
+
+  private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier;
+
+  private Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> 
cache;
+
+  @Before public void setUp()
+      throws ClassNotFoundException, IllegalAccessException, 
InvocationTargetException,
+      InstantiationException {
+    tableInfo = new TableInfo();
+    Constructor<?> constructor =
+        
Class.forName("org.apache.carbondata.core.metadata.schema.table.CarbonTable")
+            .getDeclaredConstructors()[0];
+    constructor.setAccessible(true);
+    carbonTable = (CarbonTable) constructor.newInstance();
+    absoluteTableIdentifier =
+        AbsoluteTableIdentifier.from("/opt/store/default/carbon_table/", 
"default", "carbon_table");
+    Deencapsulation.setField(tableInfo, "identifier", absoluteTableIdentifier);
+    Deencapsulation.setField(carbonTable, "tableInfo", tableInfo);
+    blockletDataMapFactory = new BlockletDataMapFactory(carbonTable, new 
DataMapSchema());
+    Deencapsulation.setField(blockletDataMapFactory, "cache",
+        
CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_DATAMAP));
+    tableBlockIndexUniqueIdentifier =
+        new 
TableBlockIndexUniqueIdentifier("/opt/store/default/carbon_table/Fact/Part0/Segment_0",
+            "0_batchno0-0-1521012756709.carbonindex", null, "0");
+    cache = 
CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_DATAMAP);
+  }
+
+  @Test public void addDataMapToCache()
+      throws IOException, MemoryException, NoSuchMethodException, 
InvocationTargetException,
+      IllegalAccessException {
+    List<BlockletDataMap> dataMaps = new ArrayList<>();
+    Method method = BlockletDataMapFactory.class
+        .getDeclaredMethod("cache", TableBlockIndexUniqueIdentifier.class,
+            BlockletDataMapIndexWrapper.class);
+    method.setAccessible(true);
+    method.invoke(blockletDataMapFactory, tableBlockIndexUniqueIdentifier,
+        new BlockletDataMapIndexWrapper(dataMaps));
+    BlockletDataMapIndexWrapper result = 
cache.getIfPresent(tableBlockIndexUniqueIdentifier);
+    assert null != result;
+  }
+
+  @Test public void getValidDistributables() throws IOException {
+    BlockletDataMapDistributable blockletDataMapDistributable = new 
BlockletDataMapDistributable(
+        
"/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756709.carbonindex");
+    Segment segment = new Segment("0", null);
+    blockletDataMapDistributable.setSegment(segment);
+    BlockletDataMapDistributable blockletDataMapDistributable1 = new 
BlockletDataMapDistributable(
+        
"/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756701.carbonindex");
+    blockletDataMapDistributable1.setSegment(segment);
+    List<DataMapDistributable> dataMapDistributables = new ArrayList<>(2);
+    dataMapDistributables.add(blockletDataMapDistributable);
+    dataMapDistributables.add(blockletDataMapDistributable1);
+    new MockUp<BlockletDataMapFactory>() {
+      @Mock Set<TableBlockIndexUniqueIdentifier> 
getTableBlockIndexUniqueIdentifiers(
+          Segment segment) {
+        TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier1 =
+            new TableBlockIndexUniqueIdentifier(
+                "/opt/store/default/carbon_table/Fact/Part0/Segment_0",
+                "0_batchno0-0-1521012756701.carbonindex", null, "0");
+        Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers 
= new HashSet<>(3);
+        tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier);
+        tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier1);
+        return tableBlockIndexUniqueIdentifiers;
+      }
+    };
+    List<DataMapDistributable> validDistributables =
+        
blockletDataMapFactory.getAllUncachedDistributables(dataMapDistributables);
+    assert 1 == validDistributables.size();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
index 8be1e2e..32af8d3 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
@@ -16,21 +16,39 @@
  */
 package org.apache.carbondata.hadoop;
 
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datastore.SegmentTaskIndexStore;
 import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 
 /**
  * CacheClient : Holds all the Cache access clients for Btree, Dictionary
  */
 public class CacheClient {
 
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CacheClient.class.getName());
+
+  private final Object lock = new Object();
+
   // segment access client for driver LRU cache
   private CacheAccessClient<TableSegmentUniqueIdentifier, 
SegmentTaskIndexWrapper>
       segmentAccessClient;
 
+  private static Map<SegmentTaskIndexStore.SegmentPropertiesWrapper, 
SegmentProperties>
+      segmentProperties = new ConcurrentHashMap<>();
+
   public CacheClient() {
     Cache<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> segmentCache =
         CacheProvider.getInstance().createCache(CacheType.DRIVER_BTREE);
@@ -45,4 +63,35 @@ public class CacheClient {
   public void close() {
     segmentAccessClient.close();
   }
+
+  /**
+   * Method to get the segment properties and avoid construction of new 
segment properties until
+   * the schema is not modified
+   *
+   * @param tableIdentifier
+   * @param columnsInTable
+   * @param columnCardinality
+   */
+  public SegmentProperties getSegmentProperties(AbsoluteTableIdentifier 
tableIdentifier,
+      List<ColumnSchema> columnsInTable, int[] columnCardinality) {
+    SegmentTaskIndexStore.SegmentPropertiesWrapper segmentPropertiesWrapper =
+        new SegmentTaskIndexStore.SegmentPropertiesWrapper(tableIdentifier, 
columnsInTable,
+            columnCardinality);
+    SegmentProperties segmentProperties = 
this.segmentProperties.get(segmentPropertiesWrapper);
+    if (null == segmentProperties) {
+      synchronized (lock) {
+        segmentProperties = 
this.segmentProperties.get(segmentPropertiesWrapper);
+        if (null == segmentProperties) {
+          // create a metadata details
+          // this will be useful in query handling
+          // all the data file metadata will have common segment properties we
+          // can use first one to get create the segment properties
+          LOGGER.info("Constructing new SegmentProperties");
+          segmentProperties = new SegmentProperties(columnsInTable, 
columnCardinality);
+          this.segmentProperties.put(segmentPropertiesWrapper, 
segmentProperties);
+        }
+      }
+    }
+    return segmentProperties;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
new file mode 100644
index 0000000..6835184
--- /dev/null
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.api;
+
+import java.util.List;
+
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * abstract class for data map job
+ */
+public abstract class AbstractDataMapJob implements DataMapJob {
+
+  @Override public void execute(CarbonTable carbonTable,
+      FileInputFormat<Void, BlockletDataMapIndexWrapper> format) {
+  }
+
+  @Override public List<ExtendedBlocklet> execute(DistributableDataMapFormat 
dataMapFormat,
+      FilterResolverIntf resolverIntf) {
+    return null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 2af147d..0e8cf6a 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -67,7 +67,7 @@ public class CarbonFileInputFormat<T> extends 
CarbonInputFormat<T> implements Se
   // a cache for carbon table, it will be used in task side
   private CarbonTable carbonTable;
 
-  protected CarbonTable getOrCreateCarbonTable(Configuration configuration) 
throws IOException {
+  public CarbonTable getOrCreateCarbonTable(Configuration configuration) 
throws IOException {
     CarbonTable carbonTableTemp;
     if (carbonTable == null) {
       // carbon table should be created either from deserialized table info 
(schema saved in

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 25cc228..a5d4df2 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -156,7 +156,7 @@ public abstract class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
   /**
    * Get the cached CarbonTable or create it by TableInfo in `configuration`
    */
-  protected abstract CarbonTable getOrCreateCarbonTable(Configuration 
configuration)
+  public abstract CarbonTable getOrCreateCarbonTable(Configuration 
configuration)
       throws IOException;
 
   public static void setTablePath(Configuration configuration, String 
tablePath) {
@@ -172,7 +172,7 @@ public abstract class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
     configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
   }
 
-  public static void setDataMapJob(Configuration configuration, DataMapJob 
dataMapJob)
+  public static void setDataMapJob(Configuration configuration, Object 
dataMapJob)
       throws IOException {
     if (dataMapJob != null) {
       String toString = 
ObjectSerializationUtil.convertObjectToString(dataMapJob);
@@ -466,15 +466,30 @@ m filterExpression
   private List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable,
       FilterResolverIntf resolver, List<Segment> segmentIds, 
DataMapExprWrapper dataMapExprWrapper,
       DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune) throws 
IOException {
-    DistributableDataMapFormat datamapDstr =
-        new DistributableDataMapFormat(carbonTable, dataMapExprWrapper, 
segmentIds,
-            partitionsToPrune, BlockletDataMapFactory.class.getName());
-    List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(datamapDstr, 
resolver);
+    String className = 
"org.apache.carbondata.hadoop.api.DistributableDataMapFormat";
+    FileInputFormat dataMapFormat =
+        createDataMapJob(carbonTable, dataMapExprWrapper, segmentIds, 
partitionsToPrune, className);
+    List<ExtendedBlocklet> prunedBlocklets =
+        dataMapJob.execute((DistributableDataMapFormat) dataMapFormat, 
resolver);
     // Apply expression on the blocklets.
     prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
     return prunedBlocklets;
   }
 
+
+  public static FileInputFormat createDataMapJob(CarbonTable carbonTable,
+      DataMapExprWrapper dataMapExprWrapper, List<Segment> segments,
+      List<PartitionSpec> partitionsToPrune, String clsName) {
+    try {
+      Constructor<?> cons = 
Class.forName(clsName).getDeclaredConstructors()[0];
+      return (FileInputFormat) cons
+          .newInstance(carbonTable, dataMapExprWrapper, segments, 
partitionsToPrune,
+              BlockletDataMapFactory.class.getName());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   /**
    * Prune the segments from the already pruned blocklets.
    * @param segments

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index f93be63..cd34bb8 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -103,7 +103,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
   /**
    * Get the cached CarbonTable or create it by TableInfo in `configuration`
    */
-  protected CarbonTable getOrCreateCarbonTable(Configuration configuration) 
throws IOException {
+  public CarbonTable getOrCreateCarbonTable(Configuration configuration) 
throws IOException {
     if (carbonTable == null) {
       // carbon table should be created either from deserialized table info 
(schema saved in
       // hive metastore) or by reading schema in HDFS (schema saved in HDFS)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
index 64936aa..c439219 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
@@ -19,15 +19,21 @@ package org.apache.carbondata.hadoop.api;
 import java.io.Serializable;
 import java.util.List;
 
+import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
 /**
  * Distributable datamap job to execute the #DistributableDataMapFormat in 
cluster. it prunes the
  * datamaps distributably and returns the final blocklet list
  */
 public interface DataMapJob extends Serializable {
 
+  void execute(CarbonTable carbonTable, FileInputFormat<Void, 
BlockletDataMapIndexWrapper> format);
+
   List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
       FilterResolverIntf filter);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 36c7414..3208a28 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -22,6 +22,8 @@ import java.text.SimpleDateFormat;
 import java.util.List;
 import java.util.Locale;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -49,6 +51,12 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  */
 public class CarbonInputFormatUtil {
 
+  /**
+   * Attribute for Carbon LOGGER.
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonProperties.class.getName());
+
   public static <V> CarbonTableInputFormat<V> createCarbonInputFormat(
       AbsoluteTableIdentifier identifier,
       Job job) throws IOException {
@@ -58,6 +66,7 @@ public class CarbonInputFormatUtil {
     CarbonTableInputFormat.setTableName(
         job.getConfiguration(), 
identifier.getCarbonTableIdentifier().getTableName());
     FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
+    setDataMapJobIfConfigured(job.getConfiguration());
     return carbonInputFormat;
   }
 
@@ -71,6 +80,7 @@ public class CarbonInputFormatUtil {
     CarbonTableInputFormat.setTableName(
         job.getConfiguration(), 
identifier.getCarbonTableIdentifier().getTableName());
     FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
+    setDataMapJobIfConfigured(job.getConfiguration());
     return carbonTableInputFormat;
   }
 
@@ -108,11 +118,10 @@ public class CarbonInputFormatUtil {
     CarbonInputFormat.setQuerySegment(conf, identifier);
     CarbonInputFormat.setFilterPredicates(conf, filterExpression);
     CarbonInputFormat.setColumnProjection(conf, columnProjection);
-    if (dataMapJob != null &&
-        Boolean.valueOf(CarbonProperties.getInstance().getProperty(
-            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
-            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT))) {
+    if (dataMapJob != null) {
       CarbonInputFormat.setDataMapJob(conf, dataMapJob);
+    } else {
+      setDataMapJobIfConfigured(conf);
     }
     // when validate segments is disabled in thread local update it to 
CarbonTableInputFormat
     CarbonSessionInfo carbonSessionInfo = 
ThreadLocalSessionInfo.getCarbonSessionInfo();
@@ -147,6 +156,32 @@ public class CarbonInputFormatUtil {
     return format;
   }
 
+  /**
+   * This method set DataMapJob if configured
+   *
+   * @param conf
+   * @throws IOException
+   */
+  public static void setDataMapJobIfConfigured(Configuration conf) throws 
IOException {
+    String className = "org.apache.carbondata.spark.rdd.SparkDataMapJob";
+    CarbonTableInputFormat.setDataMapJob(conf, createDataMapJob(className));
+  }
+
+  /**
+   * Creates instance for the DataMap Job class
+   *
+   * @param className
+   * @return
+   */
+  public static Object createDataMapJob(String className) {
+    try {
+      return 
Class.forName(className).getDeclaredConstructors()[0].newInstance();
+    } catch (Exception e) {
+      LOGGER.error(e);
+      return null;
+    }
+  }
+
   public static String createJobTrackerID(java.util.Date date) {
     return new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(date);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index 17c57b6..b0a59d4 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -41,6 +41,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with 
BeforeAndAfterAll {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION,
         CarbonEnv.getDatabaseLocation("lucene", sqlContext.sparkSession))
+      .addProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, "true")
     sql("use lucene")
     sql("DROP TABLE IF EXISTS normal_test")
     sql(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 0d960d0..24a6927 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -56,6 +56,7 @@ import org.apache.carbondata.hadoop._
 import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, 
CarbonInputFormat}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.InitInputMetrics
 import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
@@ -554,12 +555,7 @@ class CarbonScanRDD[T: ClassTag](
     CarbonInputFormat.setQuerySegment(conf, identifier)
     CarbonInputFormat.setFilterPredicates(conf, filterExpression)
     CarbonInputFormat.setColumnProjection(conf, columnProjection)
-    CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
-    if (CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
-      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
-      CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
-    }
+    CarbonInputFormatUtil.setDataMapJobIfConfigured(conf)
 
     // when validate segments is disabled in thread local update it to 
CarbonTableInputFormat
     val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
@@ -599,12 +595,7 @@ class CarbonScanRDD[T: ClassTag](
     CarbonInputFormat.setQuerySegment(conf, identifier)
     CarbonInputFormat.setFilterPredicates(conf, filterExpression)
     CarbonInputFormat.setColumnProjection(conf, columnProjection)
-    CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
-    if (CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
-      CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
-      CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
-    }
+    CarbonInputFormatUtil.setDataMapJobIfConfigured(conf)
     // when validate segments is disabled in thread local update it to 
CarbonTableInputFormat
     val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
     if (carbonSessionInfo != null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index 8d2f9ee..f51c3bc 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -29,12 +29,12 @@ import org.apache.spark.{Partition, SparkContext, 
TaskContext, TaskKilledExcepti
 
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
-import org.apache.carbondata.hadoop.api.{DataMapJob, 
DistributableDataMapFormat}
+import org.apache.carbondata.hadoop.api.{AbstractDataMapJob, 
DistributableDataMapFormat}
 
 /**
  * Spark job to execute datamap job and prune all the datamaps distributable
  */
-class SparkDataMapJob extends DataMapJob {
+class SparkDataMapJob extends AbstractDataMapJob {
 
   override def execute(dataMapFormat: DistributableDataMapFormat,
       filter: FilterResolverIntf): util.List[ExtendedBlocklet] = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index 488a53d..6622246 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -35,6 +35,7 @@ import 
org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, 
CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 
 case class CarbonCountStar(
     attributesRaw: Seq[Attribute],
@@ -74,11 +75,13 @@ case class CarbonCountStar(
     val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
     val jobConf: JobConf = new JobConf(new Configuration)
     SparkHadoopUtil.get.addCredentials(jobConf)
+    CarbonInputFormat.setTableInfo(jobConf, carbonTable.getTableInfo)
     val job = new Job(jobConf)
     FileInputFormat.addInputPath(job, new 
Path(absoluteTableIdentifier.getTablePath))
     CarbonInputFormat
       .setTransactionalTable(job.getConfiguration,
         carbonTable.getTableInfo.isTransactionalTable)
+    CarbonInputFormatUtil.setDataMapJobIfConfigured(job.getConfiguration)
     (job, carbonInputFormat)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index cce23dc..29dcec9 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -115,6 +115,15 @@ object CarbonSetCommand {
           "\" 
carbon.datamap.visible.<database_name>.<table_name>.<database_name>" +
           " = <true/false> \" format")
       }
+    } else if 
(key.startsWith(CarbonCommonConstants.CARBON_LOAD_DATAMAPS_PARALLEL)) {
+      if (key.split("\\.").length == 6) {
+        sessionParams.addProperty(key.toLowerCase(), value)
+      }
+      else {
+        throw new MalformedCarbonCommandException(
+          "property should be in \" 
carbon.load.datamaps.parallel.<database_name>" +
+          ".<table_name>=<true/false> \" format.")
+      }
     }
   }
 

Reply via email to