[CARBONDATA-2909] Multi user support for SDK on S3

Added support for multiple users with different SK/AK to write concurrently to 
S3.
Make it mandatory for user to give Hadoop configuration while creating SDK 
writer/reader.
Passed hadoop configuration to core layer so that FileFactory can access it.
Fixed various SK/AK not found exceptions in CarbonSparkFileFormat.

This closes #2678


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8f1a029b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8f1a029b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8f1a029b

Branch: refs/heads/master
Commit: 8f1a029b9ad82cb3d1972e63be2055c84895661b
Parents: 7c827c0
Author: kunal642 <[email protected]>
Authored: Fri Aug 31 11:12:30 2018 +0530
Committer: ravipesala <[email protected]>
Committed: Tue Sep 11 17:46:05 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/datamap/DataMapUtil.java    | 10 ++-
 .../apache/carbondata/core/datamap/Segment.java | 11 +++
 .../carbondata/core/datamap/dev/DataMap.java    |  3 +-
 .../core/datamap/dev/DataMapModel.java          | 12 ++-
 .../filesystem/AbstractDFSCarbonFile.java       | 18 +++--
 .../core/datastore/impl/FileFactory.java        |  5 ++
 .../indexstore/BlockletDataMapIndexStore.java   | 24 ++++--
 .../indexstore/BlockletDataMapIndexWrapper.java | 14 +++-
 .../TableBlockIndexUniqueIdentifierWrapper.java | 19 +++++
 .../indexstore/blockletindex/BlockDataMap.java  |  6 +-
 .../blockletindex/BlockletDataMapFactory.java   |  2 +-
 .../blockletindex/BlockletDataMapModel.java     | 12 +--
 .../blockletindex/SegmentIndexFileStore.java    | 37 ++++++---
 .../core/metadata/SegmentFileStore.java         | 22 +++---
 .../core/metadata/schema/SchemaReader.java      | 19 ++++-
 .../core/mutate/CarbonUpdateUtil.java           |  6 +-
 .../LatestFilesReadCommittedScope.java          | 25 +++++--
 .../core/readcommitter/ReadCommittedScope.java  |  6 ++
 .../TableStatusReadCommittedScope.java          | 19 ++++-
 .../core/reader/CarbonHeaderReader.java         | 13 +++-
 .../core/reader/CarbonIndexFileReader.java      | 12 ++-
 .../carbondata/core/reader/ThriftReader.java    | 20 ++++-
 .../statusmanager/SegmentStatusManager.java     | 23 +++---
 .../util/AbstractDataFileFooterConverter.java   | 15 +++-
 .../core/util/BlockletDataMapUtil.java          | 12 +--
 .../apache/carbondata/core/util/CarbonUtil.java | 47 +++---------
 .../core/util/DataFileFooterConverter.java      | 10 +++
 .../core/util/DataFileFooterConverter2.java     | 13 +++-
 .../core/util/DataFileFooterConverterV3.java    | 11 +++
 .../core/writer/CarbonIndexFileMergeWriter.java |  4 +-
 .../TestBlockletDataMapFactory.java             |  8 +-
 .../carbondata/core/util/CarbonUtilTest.java    |  4 +-
 .../core/util/DataFileFooterConverterTest.java  |  3 +-
 .../bloom/BloomCoarseGrainDataMapFactory.java   |  5 +-
 .../datamap/bloom/BloomDataMapModel.java        |  7 +-
 .../lucene/LuceneFineGrainDataMapFactory.java   |  7 +-
 docs/sdk-guide.md                               | 21 ++++--
 .../examples/sdk/CarbonReaderExample.java       |  7 +-
 .../carbondata/examples/sdk/SDKS3Example.java   |  6 +-
 .../carbondata/examples/DirectSQLExample.scala  |  8 +-
 .../carbondata/examples/S3UsingSDkExample.scala |  5 +-
 .../hadoop/api/CarbonFileInputFormat.java       |  7 +-
 .../hadoop/api/CarbonOutputCommitter.java       |  8 +-
 .../hadoop/api/CarbonTableInputFormat.java      | 10 ++-
 .../hadoop/api/CarbonTableOutputFormat.java     |  3 +
 .../sdv/generated/SDKwriterTestCase.scala       | 11 +--
 ...ithColumnMetCacheAndCacheLevelProperty.scala |  5 +-
 ...FileInputFormatWithExternalCarbonTable.scala |  6 +-
 .../TestNonTransactionalCarbonTable.scala       | 49 +++++++-----
 ...tNonTransactionalCarbonTableJsonWriter.scala |  3 +-
 ...ansactionalCarbonTableWithAvroDataType.scala | 58 +++++++-------
 ...ransactionalCarbonTableWithComplexType.scala |  9 ++-
 ...tSparkCarbonFileFormatWithSparkSession.scala |  7 +-
 .../dataload/TestDataLoadWithFileName.scala     |  5 +-
 .../dataload/TestGlobalSortDataLoad.scala       |  3 +-
 .../testsuite/datamap/CGDataMapTestCase.scala   |  8 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   |  8 +-
 .../testsuite/datamap/TestDataMapCommand.scala  |  4 +-
 .../TestDataLoadingForPartitionTable.scala      |  3 +-
 .../StandardPartitionTableLoadingTestCase.scala |  3 +-
 .../execution/datasources/CarbonFileIndex.scala |  2 +-
 .../datasources/SparkCarbonFileFormat.scala     |  8 +-
 .../datasource/SparkCarbonDataSourceTest.scala  |  2 +-
 ...tCreateTableUsingSparkCarbonFileFormat.scala | 20 ++---
 .../spark/rdd/CarbonDataRDDFactory.scala        |  2 +-
 .../command/mutation/DeleteExecution.scala      |  5 +-
 .../processing/merger/CarbonDataMergerUtil.java |  2 +-
 .../processing/util/CarbonLoaderUtil.java       |  2 +-
 .../carbondata/sdk/file/AvroCarbonWriter.java   |  3 +-
 .../carbondata/sdk/file/CSVCarbonWriter.java    |  3 +-
 .../sdk/file/CarbonReaderBuilder.java           |  5 +-
 .../sdk/file/CarbonWriterBuilder.java           | 38 ++++------
 .../carbondata/sdk/file/JsonCarbonWriter.java   |  8 +-
 .../sdk/file/AvroCarbonWriterTest.java          | 14 ++--
 .../sdk/file/CSVCarbonWriterTest.java           |  8 +-
 .../CSVNonTransactionalCarbonWriterTest.java    |  4 +-
 .../carbondata/sdk/file/CarbonReaderTest.java   | 79 ++++++++++----------
 .../sdk/file/ConcurrentAvroSdkWriterTest.java   |  6 +-
 .../sdk/file/ConcurrentSdkWriterTest.java       |  5 +-
 .../apache/carbondata/sdk/file/TestUtil.java    |  5 +-
 .../store/worker/SearchRequestHandler.java      |  3 +-
 81 files changed, 616 insertions(+), 349 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index e015052..60c5233 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -99,7 +100,7 @@ public class DataMapUtil {
     }
     String className = 
"org.apache.carbondata.core.datamap.DistributableDataMapFormat";
     SegmentStatusManager.ValidAndInvalidSegmentsInfo 
validAndInvalidSegmentsInfo =
-        getValidAndInvalidSegments(carbonTable);
+        getValidAndInvalidSegments(carbonTable, 
FileFactory.getConfiguration());
     List<Segment> validSegments = 
validAndInvalidSegmentsInfo.getValidSegments();
     List<Segment> invalidSegments = 
validAndInvalidSegmentsInfo.getInvalidSegments();
     DataMapExprWrapper dataMapExprWrapper = null;
@@ -140,7 +141,7 @@ public class DataMapUtil {
       List<PartitionSpec> partitionsToPrune) throws IOException {
     String className = 
"org.apache.carbondata.core.datamap.DistributableDataMapFormat";
     SegmentStatusManager.ValidAndInvalidSegmentsInfo 
validAndInvalidSegmentsInfo =
-        getValidAndInvalidSegments(carbonTable);
+        getValidAndInvalidSegments(carbonTable, 
validSegments.get(0).getConfiguration());
     List<Segment> invalidSegments = 
validAndInvalidSegmentsInfo.getInvalidSegments();
     DistributableDataMapFormat dataMapFormat =
         createDataMapJob(carbonTable, dataMapExprWrapper, validSegments, 
invalidSegments,
@@ -152,8 +153,9 @@ public class DataMapUtil {
   }
 
   private static SegmentStatusManager.ValidAndInvalidSegmentsInfo 
getValidAndInvalidSegments(
-      CarbonTable carbonTable) throws IOException {
-    SegmentStatusManager ssm = new 
SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier());
+      CarbonTable carbonTable, Configuration configuration) throws IOException 
{
+    SegmentStatusManager ssm =
+        new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(), 
configuration);
     return ssm.getValidAndInvalidSegments();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 30e811a..85445eb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -32,6 +32,8 @@ import 
org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Represents one load of carbondata
  */
@@ -64,6 +66,11 @@ public class Segment implements Serializable {
     this.segmentNo = segmentNo;
   }
 
+  public Segment(String segmentNo, ReadCommittedScope readCommittedScope) {
+    this.segmentNo = segmentNo;
+    this.readCommittedScope = readCommittedScope;
+  }
+
   /**
    * ReadCommittedScope will be null. So getCommittedIndexFile will not work 
and will throw
    * a NullPointerException. In case getCommittedIndexFile is need to be 
accessed then
@@ -202,6 +209,10 @@ public class Segment implements Serializable {
     return null;
   }
 
+  public Configuration getConfiguration() {
+    return readCommittedScope.getConfiguration();
+  }
+
   public Set<String> getFilteredIndexShardNames() {
     return filteredIndexShardNames;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index 456776b..47eeafe 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -37,7 +37,8 @@ public interface DataMap<T extends Blocklet> {
   /**
    * It is called to load the data map to memory or to initialize it.
    */
-  void init(DataMapModel dataMapModel) throws MemoryException, IOException;
+  void init(DataMapModel dataMapModel)
+      throws MemoryException, IOException;
 
   /**
    * Prune the datamap with resolved filter expression and partition 
information.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java
index 76bbeee..5f4d1dd 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.datamap.dev;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Information required to build datamap
  */
@@ -24,11 +26,19 @@ public class DataMapModel {
 
   private String filePath;
 
-  public DataMapModel(String filePath) {
+  private Configuration configuration;
+
+  public DataMapModel(String filePath, Configuration configuration) {
     this.filePath = filePath;
+    this.configuration = configuration;
   }
 
   public String getFilePath() {
     return filePath;
   }
+
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 5128022..41215a3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -282,7 +282,12 @@ public abstract class AbstractDFSCarbonFile implements 
CarbonFile {
   @Override public DataInputStream getDataInputStream(String path, 
FileFactory.FileType fileType,
       int bufferSize, Configuration hadoopConf) throws IOException {
     return getDataInputStream(path, fileType, bufferSize,
-        CarbonUtil.inferCompressorFromFileName(path));
+        CarbonUtil.inferCompressorFromFileName(path), hadoopConf);
+  }
+
+  @Override public DataInputStream getDataInputStream(String path, 
FileFactory.FileType fileType,
+      int bufferSize, String compressor) throws IOException {
+    return getDataInputStream(path, fileType, bufferSize, 
FileFactory.getConfiguration());
   }
 
   /**
@@ -305,12 +310,12 @@ public abstract class AbstractDFSCarbonFile implements 
CarbonFile {
     return new DataInputStream(new BufferedInputStream(stream));
   }
 
-  @Override public DataInputStream getDataInputStream(String path, 
FileFactory.FileType fileType,
-      int bufferSize, String compressor) throws IOException {
+  private DataInputStream getDataInputStream(String path, FileFactory.FileType 
fileType,
+      int bufferSize, String compressor, Configuration configuration) throws 
IOException {
     path = path.replace("\\", "/");
     Path pt = new Path(path);
     InputStream inputStream;
-    FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
+    FileSystem fs = pt.getFileSystem(configuration);
     if (bufferSize <= 0) {
       inputStream = fs.open(pt);
     } else {
@@ -509,7 +514,7 @@ public abstract class AbstractDFSCarbonFile implements 
CarbonFile {
     RemoteIterator<LocatedFileStatus> listStatus = null;
     if (null != fileStatus && fileStatus.isDirectory()) {
       Path path = fileStatus.getPath();
-      listStatus = 
path.getFileSystem(FileFactory.getConfiguration()).listFiles(path, recursive);
+      listStatus = fs.listFiles(path, recursive);
     } else {
       return new ArrayList<CarbonFile>();
     }
@@ -521,8 +526,7 @@ public abstract class AbstractDFSCarbonFile implements 
CarbonFile {
     if (null != fileStatus && fileStatus.isDirectory()) {
       List<FileStatus> listStatus = new ArrayList<>();
       Path path = fileStatus.getPath();
-      RemoteIterator<LocatedFileStatus> iter =
-          
path.getFileSystem(FileFactory.getConfiguration()).listLocatedStatus(path);
+      RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
       while (iter.hasNext()) {
         LocatedFileStatus fileStatus = iter.next();
         if (pathFilter.accept(fileStatus.getPath()) && fileStatus.getLen() > 
0) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index b462e0c..e8f6cfb 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -111,6 +111,11 @@ public final class FileFactory {
     return getDataInputStream(path, fileType, -1);
   }
 
+  public static DataInputStream getDataInputStream(String path, FileType 
fileType,
+      Configuration configuration) throws IOException {
+    return getDataInputStream(path, fileType, -1, configuration);
+  }
+
   public static DataInputStream getDataInputStream(String path, FileType 
fileType, int bufferSize)
       throws IOException {
     return getDataInputStream(path, fileType, bufferSize, getConfiguration());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index fa84f30..323899e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -40,6 +40,8 @@ import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.util.BlockletDataMapUtil;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Class to handle loading, unloading,clearing,storing of the table
  * blocks
@@ -87,7 +89,8 @@ public class BlockletDataMapIndexStore
     List<BlockDataMap> dataMaps = new ArrayList<>();
     if (blockletDataMapIndexWrapper == null) {
       try {
-        SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+        SegmentIndexFileStore indexFileStore =
+            new SegmentIndexFileStore(identifierWrapper.getConfiguration());
         Set<String> filesRead = new HashSet<>();
         String segmentFilePath = identifier.getIndexFilePath();
         if (segInfoCache == null) {
@@ -97,7 +100,8 @@ public class BlockletDataMapIndexStore
             segInfoCache.get(segmentFilePath);
         if (carbonDataFileBlockMetaInfoMapping == null) {
           carbonDataFileBlockMetaInfoMapping =
-              
BlockletDataMapUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath);
+              
BlockletDataMapUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath,
+                  identifierWrapper.getConfiguration());
           segInfoCache.put(segmentFilePath, 
carbonDataFileBlockMetaInfoMapping);
         }
         // if the identifier is not a merge file we can directly load the 
datamaps
@@ -107,10 +111,12 @@ public class BlockletDataMapIndexStore
                   carbonDataFileBlockMetaInfoMapping);
           BlockDataMap blockletDataMap =
               loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap,
-                  identifierWrapper.getCarbonTable(), 
identifierWrapper.isAddTableBlockToUnsafe());
+                  identifierWrapper.getCarbonTable(), 
identifierWrapper.isAddTableBlockToUnsafe(),
+                  identifierWrapper.getConfiguration());
           dataMaps.add(blockletDataMap);
           blockletDataMapIndexWrapper =
-              new BlockletDataMapIndexWrapper(identifier.getSegmentId(), 
dataMaps);
+              new BlockletDataMapIndexWrapper(identifier.getSegmentId(), 
dataMaps,
+                  identifierWrapper.getConfiguration());
         } else {
           // if the identifier is a merge file then collect the index files 
and load the datamaps
           List<TableBlockIndexUniqueIdentifier> 
tableBlockIndexUniqueIdentifiers =
@@ -125,12 +131,14 @@ public class BlockletDataMapIndexStore
               BlockDataMap blockletDataMap =
                   loadAndGetDataMap(blockIndexUniqueIdentifier, 
indexFileStore, blockMetaInfoMap,
                       identifierWrapper.getCarbonTable(),
-                      identifierWrapper.isAddTableBlockToUnsafe());
+                      identifierWrapper.isAddTableBlockToUnsafe(),
+                      identifierWrapper.getConfiguration());
               dataMaps.add(blockletDataMap);
             }
           }
           blockletDataMapIndexWrapper =
-              new BlockletDataMapIndexWrapper(identifier.getSegmentId(), 
dataMaps);
+              new BlockletDataMapIndexWrapper(identifier.getSegmentId(), 
dataMaps,
+                  identifierWrapper.getConfiguration());
         }
         lruCache.put(identifier.getUniqueTableSegmentIdentifier(), 
blockletDataMapIndexWrapper,
             blockletDataMapIndexWrapper.getMemorySize());
@@ -265,7 +273,7 @@ public class BlockletDataMapIndexStore
    */
   private BlockDataMap loadAndGetDataMap(TableBlockIndexUniqueIdentifier 
identifier,
       SegmentIndexFileStore indexFileStore, Map<String, BlockMetaInfo> 
blockMetaInfoMap,
-      CarbonTable carbonTable, boolean addTableBlockToUnsafe)
+      CarbonTable carbonTable, boolean addTableBlockToUnsafe, Configuration 
configuration)
       throws IOException, MemoryException {
     String uniqueTableSegmentIdentifier =
         identifier.getUniqueTableSegmentIdentifier();
@@ -279,7 +287,7 @@ public class BlockletDataMapIndexStore
       dataMap.init(new BlockletDataMapModel(carbonTable,
           identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR 
+ identifier
               .getIndexFileName(), 
indexFileStore.getFileData(identifier.getIndexFileName()),
-          blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe));
+          blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe, 
configuration));
     }
     return dataMap;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
index 2cf0259..7b8a13b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
@@ -24,19 +24,27 @@ import org.apache.carbondata.core.cache.Cacheable;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * A cacheable wrapper of datamaps
  */
 public class BlockletDataMapIndexWrapper implements Cacheable, Serializable {
 
+  private static final long serialVersionUID = -2859075086955465810L;
+
   private List<BlockDataMap> dataMaps;
 
   private String segmentId;
 
+  private transient Configuration configuration;
+
   // size of the wrapper. basically the total size of the datamaps this 
wrapper is holding
   private long wrapperSize;
 
-  public BlockletDataMapIndexWrapper(String segmentId,List<BlockDataMap> 
dataMaps) {
+  public BlockletDataMapIndexWrapper(String segmentId,List<BlockDataMap> 
dataMaps, Configuration
+      configuration) {
+    this.configuration = configuration;
     this.dataMaps = dataMaps;
     this.wrapperSize = 0L;
     this.segmentId = segmentId;
@@ -72,4 +80,8 @@ public class BlockletDataMapIndexWrapper implements 
Cacheable, Serializable {
   public String getSegmentId() {
     return segmentId;
   }
+
+  public Configuration getConfiguration() {
+    return configuration;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
index 0924f1f..77756de 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
@@ -19,8 +19,11 @@ package org.apache.carbondata.core.indexstore;
 
 import java.io.Serializable;
 
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Class holds reference to TableBlockIndexUniqueIdentifier and carbonTable 
related info
  * This is just a wrapper passed between methods like a context, This object 
must never be cached.
@@ -35,6 +38,8 @@ public class TableBlockIndexUniqueIdentifierWrapper 
implements Serializable {
 
   // holds the reference to CarbonTable
   private CarbonTable carbonTable;
+
+  private transient Configuration configuration;
   /**
    * flag to specify whether to load table block metadata in unsafe or safe. 
Default value is true
    */
@@ -44,6 +49,15 @@ public class TableBlockIndexUniqueIdentifierWrapper 
implements Serializable {
       TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, 
CarbonTable carbonTable) {
     this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier;
     this.carbonTable = carbonTable;
+    this.configuration = FileFactory.getConfiguration();
+  }
+
+  public TableBlockIndexUniqueIdentifierWrapper(
+      TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, 
CarbonTable carbonTable,
+      Configuration configuration) {
+    this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier;
+    this.carbonTable = carbonTable;
+    this.configuration = configuration;
   }
 
   // Note: The constructor is getting used in extensions with other 
functionalities.
@@ -53,6 +67,7 @@ public class TableBlockIndexUniqueIdentifierWrapper 
implements Serializable {
       boolean addTableBlockToUnsafe) {
     this(tableBlockIndexUniqueIdentifier, carbonTable);
     this.addTableBlockToUnsafe = addTableBlockToUnsafe;
+    this.configuration = FileFactory.getConfiguration();
   }
 
 
@@ -67,4 +82,8 @@ public class TableBlockIndexUniqueIdentifierWrapper 
implements Serializable {
   public boolean isAddTableBlockToUnsafe() {
     return addTableBlockToUnsafe;
   }
+
+  public Configuration getConfiguration() {
+    return configuration;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 2dbf6a0..57c92c6 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -103,11 +103,13 @@ public class BlockDataMap extends CoarseGrainDataMap
    */
   protected boolean isFilePathStored;
 
-  @Override public void init(DataMapModel dataMapModel) throws IOException, 
MemoryException {
+  @Override public void init(DataMapModel dataMapModel)
+      throws IOException, MemoryException {
     long startTime = System.currentTimeMillis();
     assert (dataMapModel instanceof BlockletDataMapModel);
     BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) 
dataMapModel;
-    DataFileFooterConverter fileFooterConverter = new 
DataFileFooterConverter();
+    DataFileFooterConverter fileFooterConverter =
+        new DataFileFooterConverter(dataMapModel.getConfiguration());
     List<DataFileFooter> indexInfo = fileFooterConverter
         .getIndexInfo(blockletDataMapInfo.getFilePath(), 
blockletDataMapInfo.getFileData(),
             blockletDataMapInfo.getCarbonTable().isTransactionalTable());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index da2fa39..e16c3cd 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -137,7 +137,7 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
       for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : 
identifiers) {
         tableBlockIndexUniqueIdentifierWrappers.add(
             new 
TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
-                this.getCarbonTable()));
+                this.getCarbonTable(), segment.getConfiguration()));
       }
     }
     List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
index 180c812..0a75d59 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
@@ -22,6 +22,8 @@ import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.indexstore.BlockMetaInfo;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * It is the model object to keep the information to build or initialize 
BlockletDataMap.
  */
@@ -37,9 +39,9 @@ public class BlockletDataMapModel extends DataMapModel {
 
   private boolean addToUnsafe = true;
 
-  public BlockletDataMapModel(CarbonTable carbonTable, String filePath,
-      byte[] fileData, Map<String, BlockMetaInfo> blockMetaInfoMap, String 
segmentId) {
-    super(filePath);
+  public BlockletDataMapModel(CarbonTable carbonTable, String filePath, byte[] 
fileData,
+      Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId, 
Configuration configuration) {
+    super(filePath, configuration);
     this.fileData = fileData;
     this.blockMetaInfoMap = blockMetaInfoMap;
     this.segmentId = segmentId;
@@ -48,8 +50,8 @@ public class BlockletDataMapModel extends DataMapModel {
 
   public BlockletDataMapModel(CarbonTable carbonTable, String filePath,
       byte[] fileData, Map<String, BlockMetaInfo> blockMetaInfoMap, String 
segmentId,
-      boolean addToUnsafe) {
-    this(carbonTable, filePath, fileData, blockMetaInfoMap, segmentId);
+      boolean addToUnsafe, Configuration configuration) {
+    this(carbonTable, filePath, fileData, blockMetaInfoMap, segmentId, 
configuration);
     this.addToUnsafe = addToUnsafe;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 25cfc26..f19c9c9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -48,6 +48,7 @@ import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.MergedBlockIndex;
 import org.apache.carbondata.format.MergedBlockIndexHeader;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TBase;
 
 /**
@@ -76,10 +77,20 @@ public class SegmentIndexFileStore {
    */
   private Map<String, List<String>> carbonMergeFileToIndexFilesMap;
 
+  private Configuration configuration;
+
   public SegmentIndexFileStore() {
     carbonIndexMap = new HashMap<>();
     carbonIndexMapWithFullPath = new TreeMap<>();
     carbonMergeFileToIndexFilesMap = new HashMap<>();
+    configuration = FileFactory.getConfiguration();
+  }
+
+  public SegmentIndexFileStore(Configuration configuration) {
+    carbonIndexMap = new HashMap<>();
+    carbonIndexMapWithFullPath = new TreeMap<>();
+    carbonMergeFileToIndexFilesMap = new HashMap<>();
+    this.configuration = configuration;
   }
 
   /**
@@ -89,7 +100,7 @@ public class SegmentIndexFileStore {
    * @throws IOException
    */
   public void readAllIIndexOfSegment(String segmentPath) throws IOException {
-    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
+    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath, 
configuration);
     for (int i = 0; i < carbonIndexFiles.length; i++) {
       if 
(carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
         readMergeFile(carbonIndexFiles[i].getCanonicalPath());
@@ -155,7 +166,8 @@ public class SegmentIndexFileStore {
    * @throws IOException
    */
   public void readAllIndexAndFillBolckletInfo(String segmentPath) throws 
IOException {
-    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
+    CarbonFile[] carbonIndexFiles =
+        getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
     for (int i = 0; i < carbonIndexFiles.length; i++) {
       if 
(carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
         readMergeFile(carbonIndexFiles[i].getCanonicalPath());
@@ -190,7 +202,8 @@ public class SegmentIndexFileStore {
    * @throws IOException
    */
   public Map<String, String> getIndexFilesFromSegment(String segmentPath) 
throws IOException {
-    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
+    CarbonFile[] carbonIndexFiles =
+        getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
     Map<String, String> indexFiles = new HashMap<>();
     for (int i = 0; i < carbonIndexFiles.length; i++) {
       if 
(carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
@@ -216,7 +229,8 @@ public class SegmentIndexFileStore {
    */
   public Map<String, String> getMergeOrIndexFilesFromSegment(String 
segmentPath)
       throws IOException {
-    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
+    CarbonFile[] carbonIndexFiles =
+        getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
     Map<String, String> indexFiles = new HashMap<>();
     for (int i = 0; i < carbonIndexFiles.length; i++) {
       if 
(carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
@@ -251,7 +265,7 @@ public class SegmentIndexFileStore {
    * @throws IOException
    */
   public void readMergeFile(String mergeFilePath) throws IOException {
-    ThriftReader thriftReader = new ThriftReader(mergeFilePath);
+    ThriftReader thriftReader = new ThriftReader(mergeFilePath, configuration);
     try {
       thriftReader.open();
       MergedBlockIndexHeader indexHeader = 
readMergeBlockIndexHeader(thriftReader);
@@ -259,7 +273,7 @@ public class SegmentIndexFileStore {
       List<String> file_names = indexHeader.getFile_names();
       carbonMergeFileToIndexFilesMap.put(mergeFilePath, file_names);
       List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
-      CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath);
+      CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath, 
configuration);
       String mergeFileAbsolutePath = 
mergeFile.getParentFile().getAbsolutePath();
       assert (file_names.size() == fileData.size());
       for (int i = 0; i < file_names.size(); i++) {
@@ -282,8 +296,8 @@ public class SegmentIndexFileStore {
    */
   private void readIndexFile(CarbonFile indexFile) throws IOException {
     String indexFilePath = indexFile.getCanonicalPath();
-    DataInputStream dataInputStream =
-        FileFactory.getDataInputStream(indexFilePath, 
FileFactory.getFileType(indexFilePath));
+    DataInputStream dataInputStream = FileFactory
+        .getDataInputStream(indexFilePath, 
FileFactory.getFileType(indexFilePath), configuration);
     byte[] bytes = new byte[(int) indexFile.getSize()];
     try {
       dataInputStream.readFully(bytes);
@@ -362,8 +376,8 @@ public class SegmentIndexFileStore {
    * @param segmentPath
    * @return
    */
-  public static CarbonFile[] getCarbonIndexFiles(String segmentPath) {
-    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+  public static CarbonFile[] getCarbonIndexFiles(String segmentPath, 
Configuration configuration) {
+    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath, 
configuration);
     return carbonFile.listFiles(new CarbonFileFilter() {
       @Override public boolean accept(CarbonFile file) {
         return ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || 
file.getName()
@@ -422,7 +436,8 @@ public class SegmentIndexFileStore {
       indexReader.openThriftReader(indexFile.getCanonicalPath());
       // get the index header
       org.apache.carbondata.format.IndexHeader indexHeader = 
indexReader.readIndexHeader();
-      DataFileFooterConverter fileFooterConverter = new 
DataFileFooterConverter();
+      DataFileFooterConverter fileFooterConverter =
+          new DataFileFooterConverter(FileFactory.getConfiguration());
       String filePath = 
FileFactory.getUpdatedFilePath(indexFile.getCanonicalPath());
       String parentPath =
           filePath.substring(0, 
filePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 1acf0ea..44a2f7e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -49,6 +49,7 @@ import 
org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
 /**
@@ -469,8 +470,8 @@ public class SegmentFileStore {
    * @readSegment method before calling it.
    * @throws IOException
    */
-  public void readIndexFiles() throws IOException {
-    readIndexFiles(SegmentStatus.SUCCESS, false);
+  public void readIndexFiles(Configuration configuration) throws IOException {
+    readIndexFiles(SegmentStatus.SUCCESS, false, configuration);
   }
 
   public SegmentFile getSegmentFile() {
@@ -484,8 +485,8 @@ public class SegmentFileStore {
    * @param ignoreStatus
    * @throws IOException
    */
-  private List<String> readIndexFiles(SegmentStatus status, boolean 
ignoreStatus)
-      throws IOException {
+  private List<String> readIndexFiles(SegmentStatus status, boolean 
ignoreStatus,
+      Configuration configuration) throws IOException {
     if (indexFilesMap != null) {
       return new ArrayList<>();
     }
@@ -494,7 +495,7 @@ public class SegmentFileStore {
     indexFilesMap = new HashMap<>();
     indexFileStore.readAllIIndexOfSegment(this.segmentFile, tablePath, status, 
ignoreStatus);
     Map<String, byte[]> carbonIndexMap = 
indexFileStore.getCarbonIndexMapWithFullPath();
-    DataFileFooterConverter fileFooterConverter = new 
DataFileFooterConverter();
+    DataFileFooterConverter fileFooterConverter = new 
DataFileFooterConverter(configuration);
     for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
       List<DataFileFooter> indexInfo =
           fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
@@ -538,8 +539,8 @@ public class SegmentFileStore {
     Map<String, byte[]> carbonIndexMap = 
indexFileStore.getCarbonIndexMapWithFullPath();
     DataFileFooterConverter fileFooterConverter = new 
DataFileFooterConverter();
     for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
-      List<DataFileFooter> indexInfo =
-          fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
+      List<DataFileFooter> indexInfo = fileFooterConverter
+          .getIndexInfo(entry.getKey(), entry.getValue());
       if (indexInfo.size() > 0) {
         schemaMap.put(entry.getKey(), indexInfo.get(0).getColumnInTable());
       }
@@ -733,8 +734,8 @@ public class SegmentFileStore {
         // take the list of files from this segment.
         SegmentFileStore fileStore =
             new SegmentFileStore(table.getTablePath(), 
segment.getSegmentFile());
-        List<String> indexOrMergeFiles =
-            fileStore.readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false);
+        List<String> indexOrMergeFiles = fileStore
+            .readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false, 
FileFactory.getConfiguration());
         if (forceDelete) {
           deletePhysicalPartition(
               partitionSpecs,
@@ -791,7 +792,8 @@ public class SegmentFileStore {
       List<PartitionSpec> partitionSpecs,
       SegmentUpdateStatusManager updateStatusManager) throws Exception {
     SegmentFileStore fileStore = new SegmentFileStore(tablePath, 
segment.getSegmentFileName());
-    List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS, true);
+    List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+        FileFactory.getConfiguration());
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
       FileFactory.deleteFile(entry.getKey(), 
FileFactory.getFileType(entry.getKey()));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
index 57370f6..d0bc976 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
@@ -29,6 +29,8 @@ import 
org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * TODO: It should be removed after store manager implementation.
  */
@@ -81,10 +83,25 @@ public class SchemaReader {
   }
 
   public static TableInfo inferSchema(AbsoluteTableIdentifier identifier,
+      boolean isCarbonFileProvider, Configuration configuration) throws 
IOException {
+
+    org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
+        .inferSchema(identifier.getTablePath(), identifier.getTableName(), 
isCarbonFileProvider,
+            configuration);
+    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+    TableInfo wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(
+        tableInfo, identifier.getDatabaseName(), identifier.getTableName(),
+        identifier.getTablePath());
+    wrapperTableInfo.setTransactionalTable(false);
+    return wrapperTableInfo;
+  }
+
+  public static TableInfo inferSchema(AbsoluteTableIdentifier identifier,
       boolean isCarbonFileProvider) throws IOException {
 
     org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
-        .inferSchema(identifier.getTablePath(), identifier.getTableName(), 
isCarbonFileProvider);
+        .inferSchema(identifier.getTablePath(), identifier.getTableName(), 
isCarbonFileProvider,
+            FileFactory.getConfiguration());
     SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
     TableInfo wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(
         tableInfo, identifier.getDatabaseName(), identifier.getTableName(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java 
b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 7df3937..d52eeb2 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -228,7 +228,7 @@ public class CarbonUpdateUtil {
             }
 
             // if the segments is in the list of marked for delete then update 
the status.
-            if (segmentsToBeDeleted.contains(new 
Segment(loadMetadata.getLoadName(), null))) {
+            if (segmentsToBeDeleted.contains(new 
Segment(loadMetadata.getLoadName()))) {
               loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
               
loadMetadata.setModificationOrdeletionTimesStamp(Long.parseLong(updatedTimeStamp));
             }
@@ -391,7 +391,7 @@ public class CarbonUpdateUtil {
     List<String> dataFiles = new ArrayList<>();
     if (segment.getSegmentFileName() != null) {
       SegmentFileStore fileStore = new SegmentFileStore(tablePath, 
segment.getSegmentFileName());
-      fileStore.readIndexFiles();
+      fileStore.readIndexFiles(FileFactory.getConfiguration());
       Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
       List<String> dataFilePaths = new ArrayList<>();
       for (List<String> paths : indexFilesMap.values()) {
@@ -737,7 +737,7 @@ public class CarbonUpdateUtil {
     for (Map.Entry<String, Long> eachSeg : segmentBlockCount.entrySet()) {
 
       if (eachSeg.getValue() == 0) {
-        segmentsToBeDeleted.add(new Segment(eachSeg.getKey(), null));
+        segmentsToBeDeleted.add(new Segment(eachSeg.getKey(), ""));
       }
 
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index abd9c2c..9dafed9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -36,6 +36,8 @@ import 
org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * This is a readCommittedScope for non transactional carbon table
  */
@@ -43,10 +45,12 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 @InterfaceStability.Stable
 public class LatestFilesReadCommittedScope implements ReadCommittedScope {
 
+  private static final long serialVersionUID = -839970494288861816L;
   private String carbonFilePath;
   private String segmentId;
   private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot;
   private LoadMetadataDetails[] loadMetadataDetails;
+  private transient Configuration configuration;
 
   /**
    * a new constructor of this class
@@ -54,7 +58,9 @@ public class LatestFilesReadCommittedScope implements 
ReadCommittedScope {
    * @param path      carbon file path
    * @param segmentId segment id
    */
-  public LatestFilesReadCommittedScope(String path, String segmentId) throws 
IOException {
+  public LatestFilesReadCommittedScope(String path, String segmentId, 
Configuration configuration)
+      throws IOException {
+    this.configuration = configuration;
     Objects.requireNonNull(path);
     this.carbonFilePath = path;
     this.segmentId = segmentId;
@@ -66,8 +72,9 @@ public class LatestFilesReadCommittedScope implements 
ReadCommittedScope {
    *
    * @param path carbon file path
    */
-  public LatestFilesReadCommittedScope(String path) throws IOException {
-    this(path, null);
+  public LatestFilesReadCommittedScope(String path, Configuration 
configuration)
+      throws IOException {
+    this(path, null, configuration);
   }
 
   /**
@@ -75,7 +82,8 @@ public class LatestFilesReadCommittedScope implements 
ReadCommittedScope {
    *
    * @param indexFiles carbon index files
    */
-  public LatestFilesReadCommittedScope(CarbonFile[] indexFiles) {
+  public LatestFilesReadCommittedScope(CarbonFile[] indexFiles, Configuration 
configuration) {
+    this.configuration = configuration;
     takeCarbonIndexFileSnapShot(indexFiles);
   }
 
@@ -180,7 +188,7 @@ public class LatestFilesReadCommittedScope implements 
ReadCommittedScope {
         carbonIndexFiles = indexFiles.toArray(new CarbonFile[0]);
       } else {
         String segmentPath = CarbonTablePath.getSegmentPath(carbonFilePath, 
segmentId);
-        carbonIndexFiles = 
SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
+        carbonIndexFiles = 
SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, configuration);
       }
       if (carbonIndexFiles.length == 0) {
         throw new IOException(
@@ -232,4 +240,11 @@ public class LatestFilesReadCommittedScope implements 
ReadCommittedScope {
     prepareLoadMetadata();
   }
 
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  @Override public void setConfiguration(Configuration configuration) {
+    this.configuration = configuration;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
index cbcf173..aea7e97 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
@@ -27,6 +27,8 @@ import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * ReadCommitted interface that defines a read scope.
  */
@@ -48,4 +50,8 @@ import 
org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
       throws IOException;
 
   void takeCarbonIndexFileSnapShot() throws IOException;
+
+  Configuration getConfiguration();
+
+  void setConfiguration(Configuration configuration);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index 1f61aab..ac0d156 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -31,6 +31,8 @@ import 
org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * ReadCommittedScope for the managed carbon table
  */
@@ -38,18 +40,24 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 @InterfaceStability.Stable
 public class TableStatusReadCommittedScope implements ReadCommittedScope {
 
+  private static final long serialVersionUID = 2324397174595872738L;
   private LoadMetadataDetails[] loadMetadataDetails;
 
   private AbsoluteTableIdentifier identifier;
 
-  public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier) 
throws IOException {
+  private transient Configuration configuration;
+
+  public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
+      Configuration configuration) throws IOException {
     this.identifier = identifier;
+    this.configuration = configuration;
     takeCarbonIndexFileSnapShot();
   }
 
   public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
-      LoadMetadataDetails[] loadMetadataDetails) throws IOException {
+      LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) 
throws IOException {
     this.identifier = identifier;
+    this.configuration = configuration;
     this.loadMetadataDetails = loadMetadataDetails;
   }
 
@@ -97,4 +105,11 @@ public class TableStatusReadCommittedScope implements 
ReadCommittedScope {
         
.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
   }
 
+  @Override public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  @Override public void setConfiguration(Configuration configuration) {
+    this.configuration = configuration;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java
index dfd5815..d3d9177 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java
@@ -20,11 +20,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.format.FileHeader;
 
 import static 
org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TBase;
 
 /**
@@ -36,10 +38,17 @@ public class CarbonHeaderReader {
   //Fact file path
   private String filePath;
 
+  private Configuration configuration;
+
   public CarbonHeaderReader(String filePath) {
     this.filePath = filePath;
   }
 
+  public CarbonHeaderReader(String filePath, Configuration configuration) {
+    this.filePath = filePath;
+    this.configuration = configuration;
+  }
+
   /**
    * It reads the metadata in FileFooter thrift object format.
    *
@@ -62,12 +71,12 @@ public class CarbonHeaderReader {
    * @throws IOException
    */
   private ThriftReader openThriftReader(String filePath) {
-
+    Configuration conf = configuration != null ? configuration : 
FileFactory.getConfiguration();
     return new ThriftReader(filePath, new ThriftReader.TBaseCreator() {
       @Override public TBase create() {
         return new FileHeader();
       }
-    });
+    }, conf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
index 4617a12..27eaa32 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.IndexHeader;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TBase;
 
 /**
@@ -28,6 +29,15 @@ import org.apache.thrift.TBase;
  */
 public class CarbonIndexFileReader {
 
+  private Configuration configuration;
+
+  public CarbonIndexFileReader() {
+
+  }
+
+  public CarbonIndexFileReader(Configuration configuration) {
+    this.configuration = configuration;
+  }
   /**
    * reader
    */
@@ -75,7 +85,7 @@ public class CarbonIndexFileReader {
    * @throws IOException
    */
   public void openThriftReader(String filePath) throws IOException {
-    thriftReader = new ThriftReader(filePath);
+    thriftReader = new ThriftReader(filePath, configuration);
     thriftReader.open();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java 
b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
index 221a285..48d8345 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -55,6 +56,8 @@ public class ThriftReader {
    */
   private TProtocol binaryIn;
 
+  private Configuration configuration;
+
   /**
    * Constructor.
    */
@@ -63,6 +66,12 @@ public class ThriftReader {
     this.creator = creator;
   }
 
+  public ThriftReader(String fileName, TBaseCreator creator, Configuration 
configuration) {
+    this.fileName = fileName;
+    this.configuration = configuration;
+    this.creator = creator;
+  }
+
   /**
    * Constructor.
    */
@@ -73,6 +82,14 @@ public class ThriftReader {
   /**
    * Constructor.
    */
+  public ThriftReader(String fileName, Configuration configuration) {
+    this.fileName = fileName;
+    this.configuration = configuration;
+  }
+
+  /**
+   * Constructor.
+   */
   public ThriftReader(byte[] fileData) {
     dataInputStream = new DataInputStream(new ByteArrayInputStream(fileData));
     binaryIn = new TCompactProtocol(new TIOStreamTransport(dataInputStream));
@@ -82,8 +99,9 @@ public class ThriftReader {
    * Opens the fileName for reading.
    */
   public void open() throws IOException {
+    Configuration conf = configuration != null ? configuration : 
FileFactory.getConfiguration();
     FileFactory.FileType fileType = FileFactory.getFileType(fileName);
-    dataInputStream = FileFactory.getDataInputStream(fileName, fileType, 
bufferSize);
+    dataInputStream = FileFactory.getDataInputStream(fileName, fileType, 
bufferSize, conf);
     binaryIn = new TCompactProtocol(new TIOStreamTransport(dataInputStream));
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index fdce76b..f1ee877 100755
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -55,6 +55,7 @@ import org.apache.carbondata.core.util.DeleteLoadFolders;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * Manages Load/Segment status
@@ -66,8 +67,16 @@ public class SegmentStatusManager {
 
   private AbsoluteTableIdentifier identifier;
 
+  private Configuration configuration;
+
   public SegmentStatusManager(AbsoluteTableIdentifier identifier) {
     this.identifier = identifier;
+    configuration = FileFactory.getConfiguration();
+  }
+
+  public SegmentStatusManager(AbsoluteTableIdentifier identifier, 
Configuration configuration) {
+    this.identifier = identifier;
+    this.configuration = configuration;
   }
 
   /**
@@ -93,21 +102,10 @@ public class SegmentStatusManager {
     }
   }
 
-  /**
-   * get valid segment for given table
-   *
-   * @return
-   * @throws IOException
-   */
   public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws 
IOException {
     return getValidAndInvalidSegments(null, null);
   }
 
-  public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
-      LoadMetadataDetails[] loadMetadataDetails) throws IOException {
-    return getValidAndInvalidSegments(loadMetadataDetails, null);
-  }
-
   /**
    * get valid segment for given load status details.
    */
@@ -129,7 +127,8 @@ public class SegmentStatusManager {
       }
 
       if (readCommittedScope == null) {
-        readCommittedScope = new TableStatusReadCommittedScope(identifier, 
loadMetadataDetails);
+        readCommittedScope = new TableStatusReadCommittedScope(identifier, 
loadMetadataDetails,
+            configuration);
       }
       //just directly iterate Array
       for (LoadMetadataDetails segment : loadMetadataDetails) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
 
b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index 168a526..27bc620 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -47,11 +47,19 @@ import 
org.apache.carbondata.core.scan.executor.util.QueryUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.BlockIndex;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Footer reader class
  */
 public abstract class AbstractDataFileFooterConverter {
 
+  protected Configuration configuration;
+
+  AbstractDataFileFooterConverter(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
   /**
    * Below method will be used to convert the thrift presence meta to wrapper
    * presence meta
@@ -77,7 +85,8 @@ public abstract class AbstractDataFileFooterConverter {
    * @return list of index info
    * @throws IOException problem while reading the index file
    */
-  public List<DataFileFooter> getIndexInfo(String filePath, 
List<TableBlockInfo> tableBlockInfoList)
+  public List<DataFileFooter> getIndexInfo(String filePath, 
List<TableBlockInfo>
+      tableBlockInfoList)
       throws IOException {
     CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
     List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
@@ -144,7 +153,7 @@ public abstract class AbstractDataFileFooterConverter {
    */
   public List<DataFileFooter> getIndexInfo(String filePath, byte[] fileData,
       boolean isTransactionalTable) throws IOException {
-    CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+    CarbonIndexFileReader indexReader = new 
CarbonIndexFileReader(configuration);
     List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
     String parentPath = filePath.substring(0, filePath.lastIndexOf("/"));
     try {
@@ -188,7 +197,7 @@ public abstract class AbstractDataFileFooterConverter {
         }
         if (readBlockIndexInfo.isSetBlocklet_info()) {
           List<BlockletInfo> blockletInfoList = new ArrayList<BlockletInfo>();
-          BlockletInfo blockletInfo = new DataFileFooterConverterV3()
+          BlockletInfo blockletInfo = new 
DataFileFooterConverterV3(configuration)
               .getBlockletInfo(readBlockIndexInfo.getBlocklet_info(),
                   CarbonUtil.getNumberOfDimensionColumns(columnSchemaList));
           blockletInfo.setBlockletIndex(blockletIndex);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 8e8b075..f14610c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -61,6 +61,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 
@@ -80,7 +81,7 @@ public class BlockletDataMapUtil {
         && indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
       CarbonFile indexMergeFile = FileFactory.getCarbonFile(
           identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR 
+ identifier
-              .getMergeIndexFileName());
+              .getMergeIndexFileName(), identifierWrapper.getConfiguration());
       if (indexMergeFile.exists() && 
!filesRead.contains(indexMergeFile.getPath())) {
         indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { 
indexMergeFile });
         filesRead.add(indexMergeFile.getPath());
@@ -89,7 +90,7 @@ public class BlockletDataMapUtil {
     if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
       indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { 
FileFactory.getCarbonFile(
           identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR 
+ identifier
-              .getIndexFileName()) });
+              .getIndexFileName(), identifierWrapper.getConfiguration()) });
     }
     Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
     CarbonTable carbonTable = identifierWrapper.getCarbonTable();
@@ -98,7 +99,8 @@ public class BlockletDataMapUtil {
       tableColumnList =
           carbonTable.getTableInfo().getFactTable().getListOfColumns();
     }
-    DataFileFooterConverter fileFooterConverter = new 
DataFileFooterConverter();
+    DataFileFooterConverter fileFooterConverter =
+        new DataFileFooterConverter(identifierWrapper.getConfiguration());
     List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
         identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + 
identifier
             .getIndexFileName(), 
indexFileStore.getFileData(identifier.getIndexFileName()),
@@ -139,9 +141,9 @@ public class BlockletDataMapUtil {
    * @throws IOException
    */
   public static Map<String, BlockMetaInfo> 
createCarbonDataFileBlockMetaInfoMapping(
-      String segmentFilePath) throws IOException {
+      String segmentFilePath, Configuration configuration) throws IOException {
     Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap();
-    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath);
+    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath, 
configuration);
     if (carbonFile instanceof AbstractDFSCarbonFile) {
       PathFilter pathFilter = new PathFilter() {
         @Override public boolean accept(Path path) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index c9601c0..fa982be 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1277,34 +1277,6 @@ public final class CarbonUtil {
   }
 
   /**
-   * Below method will be used to get all the block index info from index file
-   *
-   * @param taskId                  task id of the file
-   * @param tableBlockInfoList      list of table block
-   * @param identifier absolute table identifier
-   * @return list of block info
-   * @throws IOException if any problem while reading
-   */
-  public static List<DataFileFooter> readCarbonIndexFile(String taskId, String 
bucketNumber,
-      List<TableBlockInfo> tableBlockInfoList, AbsoluteTableIdentifier 
identifier)
-      throws IOException {
-    // need to sort the  block info list based for task in ascending  order so
-    // it will be sinkup with block index read from file
-    Collections.sort(tableBlockInfoList);
-    // geting the index file path
-    //TODO need to pass proper partition number when partiton will be supported
-    String carbonIndexFilePath = CarbonTablePath
-        .getCarbonIndexFilePath(identifier.getTablePath(), taskId,
-            tableBlockInfoList.get(0).getSegmentId(),
-            bucketNumber, CarbonTablePath.DataFileUtil
-                
.getTimeStampFromFileName(tableBlockInfoList.get(0).getFilePath()),
-            tableBlockInfoList.get(0).getVersion());
-    DataFileFooterConverter fileFooterConverter = new 
DataFileFooterConverter();
-    // read the index info and return
-    return fileFooterConverter.getIndexInfo(carbonIndexFilePath, 
tableBlockInfoList);
-  }
-
-  /**
    * initialize the value of dictionary chunk that can be kept in memory at a 
time
    *
    * @return
@@ -2243,17 +2215,17 @@ public final class CarbonUtil {
     }
   }
 
-  public static String getFilePathExternalFilePath(String path) {
+  public static String getFilePathExternalFilePath(String path, Configuration 
configuration) {
 
     // return the list of carbondata files in the given path.
-    CarbonFile segment = FileFactory.getCarbonFile(path, 
FileFactory.getFileType(path));
+    CarbonFile segment = FileFactory.getCarbonFile(path, configuration);
 
     CarbonFile[] dataFiles = segment.listFiles();
     for (CarbonFile dataFile : dataFiles) {
       if (dataFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
         return dataFile.getAbsolutePath();
       } else if (dataFile.isDirectory()) {
-        return getFilePathExternalFilePath(dataFile.getAbsolutePath());
+        return getFilePathExternalFilePath(dataFile.getAbsolutePath(), 
configuration);
       }
     }
     return null;
@@ -2265,12 +2237,14 @@ public final class CarbonUtil {
    * @return table info containing the schema
    */
   public static org.apache.carbondata.format.TableInfo inferSchema(String 
carbonDataFilePath,
-      String tableName, boolean isCarbonFileProvider) throws IOException {
+      String tableName, boolean isCarbonFileProvider, Configuration 
configuration)
+      throws IOException {
     String fistFilePath = null;
     if (isCarbonFileProvider) {
-      fistFilePath = getFilePathExternalFilePath(carbonDataFilePath + 
"/Fact/Part0/Segment_null");
+      fistFilePath = getFilePathExternalFilePath(carbonDataFilePath + 
"/Fact/Part0/Segment_null",
+          configuration);
     } else {
-      fistFilePath = getFilePathExternalFilePath(carbonDataFilePath);
+      fistFilePath = getFilePathExternalFilePath(carbonDataFilePath, 
configuration);
     }
     if (fistFilePath == null) {
       // Check if we can infer the schema from the hive metastore.
@@ -2645,7 +2619,7 @@ public final class CarbonUtil {
     HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>();
     Map<String, SegmentFileStore.FolderDetails> locationMap = 
fileStore.getLocationMap();
     if (locationMap != null) {
-      fileStore.readIndexFiles();
+      fileStore.readIndexFiles(FileFactory.getConfiguration());
       Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
       // get the size of carbonindex file
       carbonIndexSize = getCarbonIndexSize(fileStore, locationMap);
@@ -3192,8 +3166,7 @@ public final class CarbonUtil {
    * @param carbonTable
    * carbon Table
    */
-  public static ColumnarFormatVersion getFormatVersion(CarbonTable carbonTable)
-      throws IOException {
+  public static ColumnarFormatVersion getFormatVersion(CarbonTable 
carbonTable) throws IOException {
     String segmentPath = null;
     boolean supportFlatFolder = carbonTable.isSupportFlatFolder();
     CarbonIndexFileReader indexReader = new CarbonIndexFileReader();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
 
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
index 670536e..61b4f37 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
@@ -33,12 +33,22 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.reader.CarbonFooterReader;
 import org.apache.carbondata.format.FileFooter;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Below class will be used to convert the thrift object of data file
  * meta data to wrapper object
  */
 public class DataFileFooterConverter extends AbstractDataFileFooterConverter {
 
+  public DataFileFooterConverter(Configuration configuration) {
+    super(configuration);
+  }
+
+  public DataFileFooterConverter() {
+    super(FileFactory.getConfiguration());
+  }
+
   /**
    * Below method will be used to convert thrift file meta to wrapper file meta
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
 
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
index b20a336..db52991 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
@@ -29,6 +30,8 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.reader.CarbonFooterReader;
 import org.apache.carbondata.format.FileFooter;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * Below class will be used to convert the thrift object of data file
  * meta data to wrapper object for version 2 data file
@@ -36,6 +39,14 @@ import org.apache.carbondata.format.FileFooter;
 
 public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter {
 
+  public DataFileFooterConverter2(Configuration configuration) {
+    super(configuration);
+  }
+
+  public DataFileFooterConverter2() {
+    super(FileFactory.getConfiguration());
+  }
+
   /**
    * Below method will be used to convert thrift file meta to wrapper file meta
    */
@@ -136,6 +147,6 @@ public class DataFileFooterConverter2 extends 
AbstractDataFileFooterConverter {
   }
 
   @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) 
throws IOException {
-    return new DataFileFooterConverter().getSchema(tableBlockInfo);
+    return new 
DataFileFooterConverter(configuration).getSchema(tableBlockInfo);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
 
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
index 6a968b4..41e22fd 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
@@ -31,8 +32,18 @@ import org.apache.carbondata.core.reader.CarbonHeaderReader;
 import org.apache.carbondata.format.FileFooter3;
 import org.apache.carbondata.format.FileHeader;
 
+import org.apache.hadoop.conf.Configuration;
+
 public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter 
{
 
+  public DataFileFooterConverterV3(Configuration configuration) {
+    super(configuration);
+  }
+
+  public DataFileFooterConverterV3() {
+    super(FileFactory.getConfiguration());
+  }
+
   /**
    * Below method will be used to convert thrift file meta to wrapper file meta
    * This method will read the footer from footer offset present in the data 
file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
 
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 1634091..9dde6b7 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
@@ -77,7 +78,8 @@ public class CarbonIndexFileMergeWriter {
       List<CarbonFile> indexCarbonFiles = sfs.getIndexCarbonFiles();
       indexFiles = indexCarbonFiles.toArray(new 
CarbonFile[indexCarbonFiles.size()]);
     } else {
-      indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
+      indexFiles =
+          SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, 
FileFactory.getConfiguration());
     }
     if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != 
null) {
       if (sfs == null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
 
b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
index a3acfab..34dca0b 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
@@ -40,10 +40,12 @@ import 
org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
 
 import mockit.Deencapsulation;
 import mockit.Mock;
 import mockit.MockUp;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -103,7 +105,8 @@ public class TestBlockletDataMapFactory {
             BlockletDataMapIndexWrapper.class);
     method.setAccessible(true);
     method.invoke(blockletDataMapFactory, 
tableBlockIndexUniqueIdentifierWrapper,
-        new 
BlockletDataMapIndexWrapper(tableBlockIndexUniqueIdentifier.getSegmentId(), 
dataMaps));
+        new 
BlockletDataMapIndexWrapper(tableBlockIndexUniqueIdentifier.getSegmentId(), 
dataMaps,
+            tableBlockIndexUniqueIdentifierWrapper.getConfiguration()));
     BlockletDataMapIndexWrapper result = 
cache.getIfPresent(tableBlockIndexUniqueIdentifierWrapper);
     assert null != result;
   }
@@ -111,7 +114,8 @@ public class TestBlockletDataMapFactory {
   @Test public void getValidDistributables() throws IOException {
     BlockletDataMapDistributable blockletDataMapDistributable = new 
BlockletDataMapDistributable(
         
"/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756709.carbonindex");
-    Segment segment = new Segment("0", null);
+    Segment segment = new Segment("0", null, new 
TableStatusReadCommittedScope(carbonTable
+        .getAbsoluteTableIdentifier(), new Configuration(false)));
     blockletDataMapDistributable.setSegment(segment);
     BlockletDataMapDistributable blockletDataMapDistributable1 = new 
BlockletDataMapDistributable(
         
"/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756701.carbonindex");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java 
b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index 5520bfb..a4abc61 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -684,8 +684,8 @@ public class CarbonUtilTest {
 
   @Test public void testToGetSegmentString() {
     List<Segment> list = new ArrayList<>();
-    list.add(new Segment("1", null));
-    list.add(new Segment("2", null));
+    list.add(new Segment("1", null, null));
+    list.add(new Segment("2", null, null));
     String segments = CarbonUtil.convertToString(list);
     assertEquals(segments, "1,2");
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
 
b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
index e506994..2705b63 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
@@ -46,6 +46,7 @@ import org.apache.carbondata.format.IndexHeader;
 
 import mockit.Mock;
 import mockit.MockUp;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
 import static junit.framework.TestCase.assertEquals;
@@ -143,7 +144,7 @@ public class DataFileFooterConverterTest {
     new MockUp<FileFactory>() {
       @SuppressWarnings("unused") @Mock
       public DataInputStream getDataInputStream(String path, 
FileFactory.FileType fileType,
-          int bufferSize) {
+          int bufferSize, Configuration configuration) {
         return dataInputStream;
       }
     };

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 0d240c4..1e5b79c 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -237,7 +237,7 @@ public class BloomCoarseGrainDataMapFactory extends 
DataMapFactory<CoarseGrainDa
       }
       for (String shard : shardPaths) {
         BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
-        bloomDM.init(new BloomDataMapModel(shard, cache));
+        bloomDM.init(new BloomDataMapModel(shard, cache, 
segment.getConfiguration()));
         bloomDM.initIndexColumnConverters(getCarbonTable(), 
dataMapMeta.getIndexedColumns());
         dataMaps.add(bloomDM);
       }
@@ -253,7 +253,8 @@ public class BloomCoarseGrainDataMapFactory extends 
DataMapFactory<CoarseGrainDa
     List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>();
     BloomCoarseGrainDataMap bloomCoarseGrainDataMap = new 
BloomCoarseGrainDataMap();
     String indexPath = ((BloomDataMapDistributable) 
distributable).getIndexPath();
-    bloomCoarseGrainDataMap.init(new BloomDataMapModel(indexPath, cache));
+    bloomCoarseGrainDataMap
+        .init(new BloomDataMapModel(indexPath, cache, 
FileFactory.getConfiguration()));
     bloomCoarseGrainDataMap.initIndexColumnConverters(getCarbonTable(),
         dataMapMeta.getIndexedColumns());
     coarseGrainDataMaps.add(bloomCoarseGrainDataMap);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
index 9d5d741..7ae4906 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
@@ -19,13 +19,16 @@ package org.apache.carbondata.datamap.bloom;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 
+import org.apache.hadoop.conf.Configuration;
+
 public class BloomDataMapModel extends DataMapModel {
 
   private Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> 
cache;
 
   public BloomDataMapModel(String filePath,
-      Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache) 
{
-    super(filePath);
+      Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache,
+      Configuration configuration) {
+    super(filePath, configuration);
     this.cache = cache;
   }
 

Reply via email to