[CARBONDATA-2909] Multi user support for SDK on S3 Added support for multiple users with different SK/AK to write concurrently to S3. Make it mandatory for user to give Hadoop configuration while creating SDK writer/reader. Passed hadoop configuration to core layer so that FileFactory can access it. Fixed various SK/AK not found exceptions in CarbonSparkFileFormat.
This closes #2678 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8f1a029b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8f1a029b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8f1a029b Branch: refs/heads/master Commit: 8f1a029b9ad82cb3d1972e63be2055c84895661b Parents: 7c827c0 Author: kunal642 <[email protected]> Authored: Fri Aug 31 11:12:30 2018 +0530 Committer: ravipesala <[email protected]> Committed: Tue Sep 11 17:46:05 2018 +0530 ---------------------------------------------------------------------- .../carbondata/core/datamap/DataMapUtil.java | 10 ++- .../apache/carbondata/core/datamap/Segment.java | 11 +++ .../carbondata/core/datamap/dev/DataMap.java | 3 +- .../core/datamap/dev/DataMapModel.java | 12 ++- .../filesystem/AbstractDFSCarbonFile.java | 18 +++-- .../core/datastore/impl/FileFactory.java | 5 ++ .../indexstore/BlockletDataMapIndexStore.java | 24 ++++-- .../indexstore/BlockletDataMapIndexWrapper.java | 14 +++- .../TableBlockIndexUniqueIdentifierWrapper.java | 19 +++++ .../indexstore/blockletindex/BlockDataMap.java | 6 +- .../blockletindex/BlockletDataMapFactory.java | 2 +- .../blockletindex/BlockletDataMapModel.java | 12 +-- .../blockletindex/SegmentIndexFileStore.java | 37 ++++++--- .../core/metadata/SegmentFileStore.java | 22 +++--- .../core/metadata/schema/SchemaReader.java | 19 ++++- .../core/mutate/CarbonUpdateUtil.java | 6 +- .../LatestFilesReadCommittedScope.java | 25 +++++-- .../core/readcommitter/ReadCommittedScope.java | 6 ++ .../TableStatusReadCommittedScope.java | 19 ++++- .../core/reader/CarbonHeaderReader.java | 13 +++- .../core/reader/CarbonIndexFileReader.java | 12 ++- .../carbondata/core/reader/ThriftReader.java | 20 ++++- .../statusmanager/SegmentStatusManager.java | 23 +++--- .../util/AbstractDataFileFooterConverter.java | 15 +++- .../core/util/BlockletDataMapUtil.java | 12 +-- .../apache/carbondata/core/util/CarbonUtil.java | 47 +++--------- .../core/util/DataFileFooterConverter.java | 10 +++ .../core/util/DataFileFooterConverter2.java | 13 +++- .../core/util/DataFileFooterConverterV3.java | 11 +++ .../core/writer/CarbonIndexFileMergeWriter.java | 4 +- .../TestBlockletDataMapFactory.java | 8 +- .../carbondata/core/util/CarbonUtilTest.java | 4 +- .../core/util/DataFileFooterConverterTest.java | 3 +- .../bloom/BloomCoarseGrainDataMapFactory.java | 5 +- .../datamap/bloom/BloomDataMapModel.java | 7 +- .../lucene/LuceneFineGrainDataMapFactory.java | 7 +- docs/sdk-guide.md | 21 ++++-- .../examples/sdk/CarbonReaderExample.java | 7 +- .../carbondata/examples/sdk/SDKS3Example.java | 6 +- .../carbondata/examples/DirectSQLExample.scala | 8 +- .../carbondata/examples/S3UsingSDkExample.scala | 5 +- .../hadoop/api/CarbonFileInputFormat.java | 7 +- .../hadoop/api/CarbonOutputCommitter.java | 8 +- .../hadoop/api/CarbonTableInputFormat.java | 10 ++- .../hadoop/api/CarbonTableOutputFormat.java | 3 + .../sdv/generated/SDKwriterTestCase.scala | 11 +-- ...ithColumnMetCacheAndCacheLevelProperty.scala | 5 +- ...FileInputFormatWithExternalCarbonTable.scala | 6 +- .../TestNonTransactionalCarbonTable.scala | 49 +++++++----- ...tNonTransactionalCarbonTableJsonWriter.scala | 3 +- ...ansactionalCarbonTableWithAvroDataType.scala | 58 +++++++------- ...ransactionalCarbonTableWithComplexType.scala | 9 ++- ...tSparkCarbonFileFormatWithSparkSession.scala | 7 +- .../dataload/TestDataLoadWithFileName.scala | 5 +- .../dataload/TestGlobalSortDataLoad.scala | 3 +- .../testsuite/datamap/CGDataMapTestCase.scala | 8 +- .../testsuite/datamap/FGDataMapTestCase.scala | 8 +- .../testsuite/datamap/TestDataMapCommand.scala | 4 +- .../TestDataLoadingForPartitionTable.scala | 3 +- .../StandardPartitionTableLoadingTestCase.scala | 3 +- .../execution/datasources/CarbonFileIndex.scala | 2 +- .../datasources/SparkCarbonFileFormat.scala | 8 +- .../datasource/SparkCarbonDataSourceTest.scala | 2 +- ...tCreateTableUsingSparkCarbonFileFormat.scala | 20 ++--- .../spark/rdd/CarbonDataRDDFactory.scala | 2 +- .../command/mutation/DeleteExecution.scala | 5 +- .../processing/merger/CarbonDataMergerUtil.java | 2 +- .../processing/util/CarbonLoaderUtil.java | 2 +- .../carbondata/sdk/file/AvroCarbonWriter.java | 3 +- .../carbondata/sdk/file/CSVCarbonWriter.java | 3 +- .../sdk/file/CarbonReaderBuilder.java | 5 +- .../sdk/file/CarbonWriterBuilder.java | 38 ++++------ .../carbondata/sdk/file/JsonCarbonWriter.java | 8 +- .../sdk/file/AvroCarbonWriterTest.java | 14 ++-- .../sdk/file/CSVCarbonWriterTest.java | 8 +- .../CSVNonTransactionalCarbonWriterTest.java | 4 +- .../carbondata/sdk/file/CarbonReaderTest.java | 79 ++++++++++---------- .../sdk/file/ConcurrentAvroSdkWriterTest.java | 6 +- .../sdk/file/ConcurrentSdkWriterTest.java | 5 +- .../apache/carbondata/sdk/file/TestUtil.java | 5 +- .../store/worker/SearchRequestHandler.java | 3 +- 81 files changed, 616 insertions(+), 349 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java index e015052..60c5233 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -99,7 +100,7 @@ public class DataMapUtil { } String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat"; SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo = - getValidAndInvalidSegments(carbonTable); + getValidAndInvalidSegments(carbonTable, FileFactory.getConfiguration()); List<Segment> validSegments = validAndInvalidSegmentsInfo.getValidSegments(); List<Segment> invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments(); DataMapExprWrapper dataMapExprWrapper = null; @@ -140,7 +141,7 @@ public class DataMapUtil { List<PartitionSpec> partitionsToPrune) throws IOException { String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat"; SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo = - getValidAndInvalidSegments(carbonTable); + getValidAndInvalidSegments(carbonTable, validSegments.get(0).getConfiguration()); List<Segment> invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments(); DistributableDataMapFormat dataMapFormat = createDataMapJob(carbonTable, dataMapExprWrapper, validSegments, invalidSegments, @@ -152,8 +153,9 @@ public class DataMapUtil { } private static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments( - CarbonTable carbonTable) throws IOException { - SegmentStatusManager ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier()); + CarbonTable carbonTable, Configuration configuration) throws IOException { + SegmentStatusManager ssm = + new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(), configuration); return ssm.getValidAndInvalidSegments(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java index 30e811a..85445eb 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java @@ -32,6 +32,8 @@ import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.hadoop.conf.Configuration; + /** * Represents one load of carbondata */ @@ -64,6 +66,11 @@ public class Segment implements Serializable { this.segmentNo = segmentNo; } + public Segment(String segmentNo, ReadCommittedScope readCommittedScope) { + this.segmentNo = segmentNo; + this.readCommittedScope = readCommittedScope; + } + /** * ReadCommittedScope will be null. So getCommittedIndexFile will not work and will throw * a NullPointerException. In case getCommittedIndexFile is need to be accessed then @@ -202,6 +209,10 @@ public class Segment implements Serializable { return null; } + public Configuration getConfiguration() { + return readCommittedScope.getConfiguration(); + } + public Set<String> getFilteredIndexShardNames() { return filteredIndexShardNames; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java index 456776b..47eeafe 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java @@ -37,7 +37,8 @@ public interface DataMap<T extends Blocklet> { /** * It is called to load the data map to memory or to initialize it. */ - void init(DataMapModel dataMapModel) throws MemoryException, IOException; + void init(DataMapModel dataMapModel) + throws MemoryException, IOException; /** * Prune the datamap with resolved filter expression and partition information. http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java index 76bbeee..5f4d1dd 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java @@ -17,6 +17,8 @@ package org.apache.carbondata.core.datamap.dev; +import org.apache.hadoop.conf.Configuration; + /** * Information required to build datamap */ @@ -24,11 +26,19 @@ public class DataMapModel { private String filePath; - public DataMapModel(String filePath) { + private Configuration configuration; + + public DataMapModel(String filePath, Configuration configuration) { this.filePath = filePath; + this.configuration = configuration; } public String getFilePath() { return filePath; } + + public Configuration getConfiguration() { + return configuration; + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java index 5128022..41215a3 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java @@ -282,7 +282,12 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile { @Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType, int bufferSize, Configuration hadoopConf) throws IOException { return getDataInputStream(path, fileType, bufferSize, - CarbonUtil.inferCompressorFromFileName(path)); + CarbonUtil.inferCompressorFromFileName(path), hadoopConf); + } + + @Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType, + int bufferSize, String compressor) throws IOException { + return getDataInputStream(path, fileType, bufferSize, FileFactory.getConfiguration()); } /** @@ -305,12 +310,12 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile { return new DataInputStream(new BufferedInputStream(stream)); } - @Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType, - int bufferSize, String compressor) throws IOException { + private DataInputStream getDataInputStream(String path, FileFactory.FileType fileType, + int bufferSize, String compressor, Configuration configuration) throws IOException { path = path.replace("\\", "/"); Path pt = new Path(path); InputStream inputStream; - FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration()); + FileSystem fs = pt.getFileSystem(configuration); if (bufferSize <= 0) { inputStream = fs.open(pt); } else { @@ -509,7 +514,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile { RemoteIterator<LocatedFileStatus> listStatus = null; if (null != fileStatus && fileStatus.isDirectory()) { Path path = fileStatus.getPath(); - listStatus = path.getFileSystem(FileFactory.getConfiguration()).listFiles(path, recursive); + listStatus = fs.listFiles(path, recursive); } else { return new ArrayList<CarbonFile>(); } @@ -521,8 +526,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile { if (null != fileStatus && fileStatus.isDirectory()) { List<FileStatus> listStatus = new ArrayList<>(); Path path = fileStatus.getPath(); - RemoteIterator<LocatedFileStatus> iter = - path.getFileSystem(FileFactory.getConfiguration()).listLocatedStatus(path); + RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); while (iter.hasNext()) { LocatedFileStatus fileStatus = iter.next(); if (pathFilter.accept(fileStatus.getPath()) && fileStatus.getLen() > 0) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java index b462e0c..e8f6cfb 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java @@ -111,6 +111,11 @@ public final class FileFactory { return getDataInputStream(path, fileType, -1); } + public static DataInputStream getDataInputStream(String path, FileType fileType, + Configuration configuration) throws IOException { + return getDataInputStream(path, fileType, -1, configuration); + } + public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize) throws IOException { return getDataInputStream(path, fileType, bufferSize, getConfiguration()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java index fa84f30..323899e 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java @@ -40,6 +40,8 @@ import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.util.BlockletDataMapUtil; +import org.apache.hadoop.conf.Configuration; + /** * Class to handle loading, unloading,clearing,storing of the table * blocks @@ -87,7 +89,8 @@ public class BlockletDataMapIndexStore List<BlockDataMap> dataMaps = new ArrayList<>(); if (blockletDataMapIndexWrapper == null) { try { - SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); + SegmentIndexFileStore indexFileStore = + new SegmentIndexFileStore(identifierWrapper.getConfiguration()); Set<String> filesRead = new HashSet<>(); String segmentFilePath = identifier.getIndexFilePath(); if (segInfoCache == null) { @@ -97,7 +100,8 @@ public class BlockletDataMapIndexStore segInfoCache.get(segmentFilePath); if (carbonDataFileBlockMetaInfoMapping == null) { carbonDataFileBlockMetaInfoMapping = - BlockletDataMapUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath); + BlockletDataMapUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath, + identifierWrapper.getConfiguration()); segInfoCache.put(segmentFilePath, carbonDataFileBlockMetaInfoMapping); } // if the identifier is not a merge file we can directly load the datamaps @@ -107,10 +111,12 @@ public class BlockletDataMapIndexStore carbonDataFileBlockMetaInfoMapping); BlockDataMap blockletDataMap = loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap, - identifierWrapper.getCarbonTable(), identifierWrapper.isAddTableBlockToUnsafe()); + identifierWrapper.getCarbonTable(), identifierWrapper.isAddTableBlockToUnsafe(), + identifierWrapper.getConfiguration()); dataMaps.add(blockletDataMap); blockletDataMapIndexWrapper = - new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps); + new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps, + identifierWrapper.getConfiguration()); } else { // if the identifier is a merge file then collect the index files and load the datamaps List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = @@ -125,12 +131,14 @@ public class BlockletDataMapIndexStore BlockDataMap blockletDataMap = loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap, identifierWrapper.getCarbonTable(), - identifierWrapper.isAddTableBlockToUnsafe()); + identifierWrapper.isAddTableBlockToUnsafe(), + identifierWrapper.getConfiguration()); dataMaps.add(blockletDataMap); } } blockletDataMapIndexWrapper = - new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps); + new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps, + identifierWrapper.getConfiguration()); } lruCache.put(identifier.getUniqueTableSegmentIdentifier(), blockletDataMapIndexWrapper, blockletDataMapIndexWrapper.getMemorySize()); @@ -265,7 +273,7 @@ public class BlockletDataMapIndexStore */ private BlockDataMap loadAndGetDataMap(TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore, Map<String, BlockMetaInfo> blockMetaInfoMap, - CarbonTable carbonTable, boolean addTableBlockToUnsafe) + CarbonTable carbonTable, boolean addTableBlockToUnsafe, Configuration configuration) throws IOException, MemoryException { String uniqueTableSegmentIdentifier = identifier.getUniqueTableSegmentIdentifier(); @@ -279,7 +287,7 @@ public class BlockletDataMapIndexStore dataMap.init(new BlockletDataMapModel(carbonTable, identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()), - blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe)); + blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe, configuration)); } return dataMap; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java index 2cf0259..7b8a13b 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java @@ -24,19 +24,27 @@ import org.apache.carbondata.core.cache.Cacheable; import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap; +import org.apache.hadoop.conf.Configuration; + /** * A cacheable wrapper of datamaps */ public class BlockletDataMapIndexWrapper implements Cacheable, Serializable { + private static final long serialVersionUID = -2859075086955465810L; + private List<BlockDataMap> dataMaps; private String segmentId; + private transient Configuration configuration; + // size of the wrapper. basically the total size of the datamaps this wrapper is holding private long wrapperSize; - public BlockletDataMapIndexWrapper(String segmentId,List<BlockDataMap> dataMaps) { + public BlockletDataMapIndexWrapper(String segmentId,List<BlockDataMap> dataMaps, Configuration + configuration) { + this.configuration = configuration; this.dataMaps = dataMaps; this.wrapperSize = 0L; this.segmentId = segmentId; @@ -72,4 +80,8 @@ public class BlockletDataMapIndexWrapper implements Cacheable, Serializable { public String getSegmentId() { return segmentId; } + + public Configuration getConfiguration() { + return configuration; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java index 0924f1f..77756de 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java @@ -19,8 +19,11 @@ package org.apache.carbondata.core.indexstore; import java.io.Serializable; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.hadoop.conf.Configuration; + /** * Class holds reference to TableBlockIndexUniqueIdentifier and carbonTable related info * This is just a wrapper passed between methods like a context, This object must never be cached. @@ -35,6 +38,8 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable { // holds the reference to CarbonTable private CarbonTable carbonTable; + + private transient Configuration configuration; /** * flag to specify whether to load table block metadata in unsafe or safe. Default value is true */ @@ -44,6 +49,15 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable { TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable) { this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier; this.carbonTable = carbonTable; + this.configuration = FileFactory.getConfiguration(); + } + + public TableBlockIndexUniqueIdentifierWrapper( + TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable, + Configuration configuration) { + this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier; + this.carbonTable = carbonTable; + this.configuration = configuration; } // Note: The constructor is getting used in extensions with other functionalities. @@ -53,6 +67,7 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable { boolean addTableBlockToUnsafe) { this(tableBlockIndexUniqueIdentifier, carbonTable); this.addTableBlockToUnsafe = addTableBlockToUnsafe; + this.configuration = FileFactory.getConfiguration(); } @@ -67,4 +82,8 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable { public boolean isAddTableBlockToUnsafe() { return addTableBlockToUnsafe; } + + public Configuration getConfiguration() { + return configuration; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java index 2dbf6a0..57c92c6 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java @@ -103,11 +103,13 @@ public class BlockDataMap extends CoarseGrainDataMap */ protected boolean isFilePathStored; - @Override public void init(DataMapModel dataMapModel) throws IOException, MemoryException { + @Override public void init(DataMapModel dataMapModel) + throws IOException, MemoryException { long startTime = System.currentTimeMillis(); assert (dataMapModel instanceof BlockletDataMapModel); BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) dataMapModel; - DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + DataFileFooterConverter fileFooterConverter = + new DataFileFooterConverter(dataMapModel.getConfiguration()); List<DataFileFooter> indexInfo = fileFooterConverter .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData(), blockletDataMapInfo.getCarbonTable().isTransactionalTable()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index da2fa39..e16c3cd 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -137,7 +137,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) { tableBlockIndexUniqueIdentifierWrappers.add( new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, - this.getCarbonTable())); + this.getCarbonTable(), segment.getConfiguration())); } } List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers = http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java index 180c812..0a75d59 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java @@ -22,6 +22,8 @@ import org.apache.carbondata.core.datamap.dev.DataMapModel; import org.apache.carbondata.core.indexstore.BlockMetaInfo; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.hadoop.conf.Configuration; + /** * It is the model object to keep the information to build or initialize BlockletDataMap. */ @@ -37,9 +39,9 @@ public class BlockletDataMapModel extends DataMapModel { private boolean addToUnsafe = true; - public BlockletDataMapModel(CarbonTable carbonTable, String filePath, - byte[] fileData, Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId) { - super(filePath); + public BlockletDataMapModel(CarbonTable carbonTable, String filePath, byte[] fileData, + Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId, Configuration configuration) { + super(filePath, configuration); this.fileData = fileData; this.blockMetaInfoMap = blockMetaInfoMap; this.segmentId = segmentId; @@ -48,8 +50,8 @@ public class BlockletDataMapModel extends DataMapModel { public BlockletDataMapModel(CarbonTable carbonTable, String filePath, byte[] fileData, Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId, - boolean addToUnsafe) { - this(carbonTable, filePath, fileData, blockMetaInfoMap, segmentId); + boolean addToUnsafe, Configuration configuration) { + this(carbonTable, filePath, fileData, blockMetaInfoMap, segmentId, configuration); this.addToUnsafe = addToUnsafe; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java index 25cfc26..f19c9c9 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java @@ -48,6 +48,7 @@ import org.apache.carbondata.format.BlockIndex; import org.apache.carbondata.format.MergedBlockIndex; import org.apache.carbondata.format.MergedBlockIndexHeader; +import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TBase; /** @@ -76,10 +77,20 @@ public class SegmentIndexFileStore { */ private Map<String, List<String>> carbonMergeFileToIndexFilesMap; + private Configuration configuration; + public SegmentIndexFileStore() { carbonIndexMap = new HashMap<>(); carbonIndexMapWithFullPath = new TreeMap<>(); carbonMergeFileToIndexFilesMap = new HashMap<>(); + configuration = FileFactory.getConfiguration(); + } + + public SegmentIndexFileStore(Configuration configuration) { + carbonIndexMap = new HashMap<>(); + carbonIndexMapWithFullPath = new TreeMap<>(); + carbonMergeFileToIndexFilesMap = new HashMap<>(); + this.configuration = configuration; } /** @@ -89,7 +100,7 @@ public class SegmentIndexFileStore { * @throws IOException */ public void readAllIIndexOfSegment(String segmentPath) throws IOException { - CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath); + CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath, configuration); for (int i = 0; i < carbonIndexFiles.length; i++) { if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { readMergeFile(carbonIndexFiles[i].getCanonicalPath()); @@ -155,7 +166,8 @@ public class SegmentIndexFileStore { * @throws IOException */ public void readAllIndexAndFillBolckletInfo(String segmentPath) throws IOException { - CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath); + CarbonFile[] carbonIndexFiles = + getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration()); for (int i = 0; i < carbonIndexFiles.length; i++) { if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { readMergeFile(carbonIndexFiles[i].getCanonicalPath()); @@ -190,7 +202,8 @@ public class SegmentIndexFileStore { * @throws IOException */ public Map<String, String> getIndexFilesFromSegment(String segmentPath) throws IOException { - CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath); + CarbonFile[] carbonIndexFiles = + getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration()); Map<String, String> indexFiles = new HashMap<>(); for (int i = 0; i < carbonIndexFiles.length; i++) { if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { @@ -216,7 +229,8 @@ public class SegmentIndexFileStore { */ public Map<String, String> getMergeOrIndexFilesFromSegment(String segmentPath) throws IOException { - CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath); + CarbonFile[] carbonIndexFiles = + getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration()); Map<String, String> indexFiles = new HashMap<>(); for (int i = 0; i < carbonIndexFiles.length; i++) { if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { @@ -251,7 +265,7 @@ public class SegmentIndexFileStore { * @throws IOException */ public void readMergeFile(String mergeFilePath) throws IOException { - ThriftReader thriftReader = new ThriftReader(mergeFilePath); + ThriftReader thriftReader = new ThriftReader(mergeFilePath, configuration); try { thriftReader.open(); MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader); @@ -259,7 +273,7 @@ public class SegmentIndexFileStore { List<String> file_names = indexHeader.getFile_names(); carbonMergeFileToIndexFilesMap.put(mergeFilePath, file_names); List<ByteBuffer> fileData = mergedBlockIndex.getFileData(); - CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath); + CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath, configuration); String mergeFileAbsolutePath = mergeFile.getParentFile().getAbsolutePath(); assert (file_names.size() == fileData.size()); for (int i = 0; i < file_names.size(); i++) { @@ -282,8 +296,8 @@ public class SegmentIndexFileStore { */ private void readIndexFile(CarbonFile indexFile) throws IOException { String indexFilePath = indexFile.getCanonicalPath(); - DataInputStream dataInputStream = - FileFactory.getDataInputStream(indexFilePath, FileFactory.getFileType(indexFilePath)); + DataInputStream dataInputStream = FileFactory + .getDataInputStream(indexFilePath, FileFactory.getFileType(indexFilePath), configuration); byte[] bytes = new byte[(int) indexFile.getSize()]; try { dataInputStream.readFully(bytes); @@ -362,8 +376,8 @@ public class SegmentIndexFileStore { * @param segmentPath * @return */ - public static CarbonFile[] getCarbonIndexFiles(String segmentPath) { - CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); + public static CarbonFile[] getCarbonIndexFiles(String segmentPath, Configuration configuration) { + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath, configuration); return carbonFile.listFiles(new CarbonFileFilter() { @Override public boolean accept(CarbonFile file) { return ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName() @@ -422,7 +436,8 @@ public class SegmentIndexFileStore { indexReader.openThriftReader(indexFile.getCanonicalPath()); // get the index header org.apache.carbondata.format.IndexHeader indexHeader = indexReader.readIndexHeader(); - DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + DataFileFooterConverter fileFooterConverter = + new DataFileFooterConverter(FileFactory.getConfiguration()); String filePath = FileFactory.getUpdatedFilePath(indexFile.getCanonicalPath()); String parentPath = filePath.substring(0, filePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index 1acf0ea..44a2f7e 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -49,6 +49,7 @@ import org.apache.carbondata.core.util.DataFileFooterConverter; import org.apache.carbondata.core.util.path.CarbonTablePath; import com.google.gson.Gson; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; /** @@ -469,8 +470,8 @@ public class SegmentFileStore { * @readSegment method before calling it. * @throws IOException */ - public void readIndexFiles() throws IOException { - readIndexFiles(SegmentStatus.SUCCESS, false); + public void readIndexFiles(Configuration configuration) throws IOException { + readIndexFiles(SegmentStatus.SUCCESS, false, configuration); } public SegmentFile getSegmentFile() { @@ -484,8 +485,8 @@ public class SegmentFileStore { * @param ignoreStatus * @throws IOException */ - private List<String> readIndexFiles(SegmentStatus status, boolean ignoreStatus) - throws IOException { + private List<String> readIndexFiles(SegmentStatus status, boolean ignoreStatus, + Configuration configuration) throws IOException { if (indexFilesMap != null) { return new ArrayList<>(); } @@ -494,7 +495,7 @@ public class SegmentFileStore { indexFilesMap = new HashMap<>(); indexFileStore.readAllIIndexOfSegment(this.segmentFile, tablePath, status, ignoreStatus); Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath(); - DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(configuration); for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) { List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue()); @@ -538,8 +539,8 @@ public class SegmentFileStore { Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath(); DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) { - List<DataFileFooter> indexInfo = - fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue()); + List<DataFileFooter> indexInfo = fileFooterConverter + .getIndexInfo(entry.getKey(), entry.getValue()); if (indexInfo.size() > 0) { schemaMap.put(entry.getKey(), indexInfo.get(0).getColumnInTable()); } @@ -733,8 +734,8 @@ public class SegmentFileStore { // take the list of files from this segment. SegmentFileStore fileStore = new SegmentFileStore(table.getTablePath(), segment.getSegmentFile()); - List<String> indexOrMergeFiles = - fileStore.readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false); + List<String> indexOrMergeFiles = fileStore + .readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false, FileFactory.getConfiguration()); if (forceDelete) { deletePhysicalPartition( partitionSpecs, @@ -791,7 +792,8 @@ public class SegmentFileStore { List<PartitionSpec> partitionSpecs, SegmentUpdateStatusManager updateStatusManager) throws Exception { SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName()); - List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true); + List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true, + FileFactory.getConfiguration()); Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap(); for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) { FileFactory.deleteFile(entry.getKey(), FileFactory.getFileType(entry.getKey())); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java index 57370f6..d0bc976 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java @@ -29,6 +29,8 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.hadoop.conf.Configuration; + /** * TODO: It should be removed after store manager implementation. */ @@ -81,10 +83,25 @@ public class SchemaReader { } public static TableInfo inferSchema(AbsoluteTableIdentifier identifier, + boolean isCarbonFileProvider, Configuration configuration) throws IOException { + + org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil + .inferSchema(identifier.getTablePath(), identifier.getTableName(), isCarbonFileProvider, + configuration); + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( + tableInfo, identifier.getDatabaseName(), identifier.getTableName(), + identifier.getTablePath()); + wrapperTableInfo.setTransactionalTable(false); + return wrapperTableInfo; + } + + public static TableInfo inferSchema(AbsoluteTableIdentifier identifier, boolean isCarbonFileProvider) throws IOException { org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil - .inferSchema(identifier.getTablePath(), identifier.getTableName(), isCarbonFileProvider); + .inferSchema(identifier.getTablePath(), identifier.getTableName(), isCarbonFileProvider, + FileFactory.getConfiguration()); SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( tableInfo, identifier.getDatabaseName(), identifier.getTableName(), http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java index 7df3937..d52eeb2 100644 --- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java @@ -228,7 +228,7 @@ public class CarbonUpdateUtil { } // if the segments is in the list of marked for delete then update the status. - if (segmentsToBeDeleted.contains(new Segment(loadMetadata.getLoadName(), null))) { + if (segmentsToBeDeleted.contains(new Segment(loadMetadata.getLoadName()))) { loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE); loadMetadata.setModificationOrdeletionTimesStamp(Long.parseLong(updatedTimeStamp)); } @@ -391,7 +391,7 @@ public class CarbonUpdateUtil { List<String> dataFiles = new ArrayList<>(); if (segment.getSegmentFileName() != null) { SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName()); - fileStore.readIndexFiles(); + fileStore.readIndexFiles(FileFactory.getConfiguration()); Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap(); List<String> dataFilePaths = new ArrayList<>(); for (List<String> paths : indexFilesMap.values()) { @@ -737,7 +737,7 @@ public class CarbonUpdateUtil { for (Map.Entry<String, Long> eachSeg : segmentBlockCount.entrySet()) { if (eachSeg.getValue() == 0) { - segmentsToBeDeleted.add(new Segment(eachSeg.getKey(), null)); + segmentsToBeDeleted.add(new Segment(eachSeg.getKey(), "")); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java index abd9c2c..9dafed9 100644 --- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java +++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java @@ -36,6 +36,8 @@ import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.hadoop.conf.Configuration; + /** * This is a readCommittedScope for non transactional carbon table */ @@ -43,10 +45,12 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; @InterfaceStability.Stable public class LatestFilesReadCommittedScope implements ReadCommittedScope { + private static final long serialVersionUID = -839970494288861816L; private String carbonFilePath; private String segmentId; private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot; private LoadMetadataDetails[] loadMetadataDetails; + private transient Configuration configuration; /** * a new constructor of this class @@ -54,7 +58,9 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { * @param path carbon file path * @param segmentId segment id */ - public LatestFilesReadCommittedScope(String path, String segmentId) throws IOException { + public LatestFilesReadCommittedScope(String path, String segmentId, Configuration configuration) + throws IOException { + this.configuration = configuration; Objects.requireNonNull(path); this.carbonFilePath = path; this.segmentId = segmentId; @@ -66,8 +72,9 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { * * @param path carbon file path */ - public LatestFilesReadCommittedScope(String path) throws IOException { - this(path, null); + public LatestFilesReadCommittedScope(String path, Configuration configuration) + throws IOException { + this(path, null, configuration); } /** @@ -75,7 +82,8 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { * * @param indexFiles carbon index files */ - public LatestFilesReadCommittedScope(CarbonFile[] indexFiles) { + public LatestFilesReadCommittedScope(CarbonFile[] indexFiles, Configuration configuration) { + this.configuration = configuration; takeCarbonIndexFileSnapShot(indexFiles); } @@ -180,7 +188,7 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { carbonIndexFiles = indexFiles.toArray(new CarbonFile[0]); } else { String segmentPath = CarbonTablePath.getSegmentPath(carbonFilePath, segmentId); - carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath); + carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, configuration); } if (carbonIndexFiles.length == 0) { throw new IOException( @@ -232,4 +240,11 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { prepareLoadMetadata(); } + public Configuration getConfiguration() { + return configuration; + } + + @Override public void setConfiguration(Configuration configuration) { + this.configuration = configuration; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java index cbcf173..aea7e97 100644 --- a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java +++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java @@ -27,6 +27,8 @@ import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo; +import org.apache.hadoop.conf.Configuration; + /** * ReadCommitted interface that defines a read scope. */ @@ -48,4 +50,8 @@ import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo; throws IOException; void takeCarbonIndexFileSnapShot() throws IOException; + + Configuration getConfiguration(); + + void setConfiguration(Configuration configuration); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java index 1f61aab..ac0d156 100644 --- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java +++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java @@ -31,6 +31,8 @@ import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.hadoop.conf.Configuration; + /** * ReadCommittedScope for the managed carbon table */ @@ -38,18 +40,24 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; @InterfaceStability.Stable public class TableStatusReadCommittedScope implements ReadCommittedScope { + private static final long serialVersionUID = 2324397174595872738L; private LoadMetadataDetails[] loadMetadataDetails; private AbsoluteTableIdentifier identifier; - public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier) throws IOException { + private transient Configuration configuration; + + public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier, + Configuration configuration) throws IOException { this.identifier = identifier; + this.configuration = configuration; takeCarbonIndexFileSnapShot(); } public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier, - LoadMetadataDetails[] loadMetadataDetails) throws IOException { + LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) throws IOException { this.identifier = identifier; + this.configuration = configuration; this.loadMetadataDetails = loadMetadataDetails; } @@ -97,4 +105,11 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope { .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath())); } + @Override public Configuration getConfiguration() { + return configuration; + } + + @Override public void setConfiguration(Configuration configuration) { + this.configuration = configuration; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java index dfd5815..d3d9177 100644 --- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java +++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java @@ -20,11 +20,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.format.FileHeader; import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema; +import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TBase; /** @@ -36,10 +38,17 @@ public class CarbonHeaderReader { //Fact file path private String filePath; + private Configuration configuration; + public CarbonHeaderReader(String filePath) { this.filePath = filePath; } + public CarbonHeaderReader(String filePath, Configuration configuration) { + this.filePath = filePath; + this.configuration = configuration; + } + /** * It reads the metadata in FileFooter thrift object format. * @@ -62,12 +71,12 @@ public class CarbonHeaderReader { * @throws IOException */ private ThriftReader openThriftReader(String filePath) { - + Configuration conf = configuration != null ? configuration : FileFactory.getConfiguration(); return new ThriftReader(filePath, new ThriftReader.TBaseCreator() { @Override public TBase create() { return new FileHeader(); } - }); + }, conf); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java index 4617a12..27eaa32 100644 --- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java +++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.carbondata.format.BlockIndex; import org.apache.carbondata.format.IndexHeader; +import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TBase; /** @@ -28,6 +29,15 @@ import org.apache.thrift.TBase; */ public class CarbonIndexFileReader { + private Configuration configuration; + + public CarbonIndexFileReader() { + + } + + public CarbonIndexFileReader(Configuration configuration) { + this.configuration = configuration; + } /** * reader */ @@ -75,7 +85,7 @@ public class CarbonIndexFileReader { * @throws IOException */ public void openThriftReader(String filePath) throws IOException { - thriftReader = new ThriftReader(filePath); + thriftReader = new ThriftReader(filePath, configuration); thriftReader.open(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java index 221a285..48d8345 100644 --- a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java +++ b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java @@ -24,6 +24,7 @@ import java.io.IOException; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.thrift.protocol.TCompactProtocol; @@ -55,6 +56,8 @@ public class ThriftReader { */ private TProtocol binaryIn; + private Configuration configuration; + /** * Constructor. */ @@ -63,6 +66,12 @@ public class ThriftReader { this.creator = creator; } + public ThriftReader(String fileName, TBaseCreator creator, Configuration configuration) { + this.fileName = fileName; + this.configuration = configuration; + this.creator = creator; + } + /** * Constructor. */ @@ -73,6 +82,14 @@ public class ThriftReader { /** * Constructor. */ + public ThriftReader(String fileName, Configuration configuration) { + this.fileName = fileName; + this.configuration = configuration; + } + + /** + * Constructor. + */ public ThriftReader(byte[] fileData) { dataInputStream = new DataInputStream(new ByteArrayInputStream(fileData)); binaryIn = new TCompactProtocol(new TIOStreamTransport(dataInputStream)); @@ -82,8 +99,9 @@ public class ThriftReader { * Opens the fileName for reading. */ public void open() throws IOException { + Configuration conf = configuration != null ? configuration : FileFactory.getConfiguration(); FileFactory.FileType fileType = FileFactory.getFileType(fileName); - dataInputStream = FileFactory.getDataInputStream(fileName, fileType, bufferSize); + dataInputStream = FileFactory.getDataInputStream(fileName, fileType, bufferSize, conf); binaryIn = new TCompactProtocol(new TIOStreamTransport(dataInputStream)); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index fdce76b..f1ee877 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -55,6 +55,7 @@ import org.apache.carbondata.core.util.DeleteLoadFolders; import org.apache.carbondata.core.util.path.CarbonTablePath; import com.google.gson.Gson; +import org.apache.hadoop.conf.Configuration; /** * Manages Load/Segment status @@ -66,8 +67,16 @@ public class SegmentStatusManager { private AbsoluteTableIdentifier identifier; + private Configuration configuration; + public SegmentStatusManager(AbsoluteTableIdentifier identifier) { this.identifier = identifier; + configuration = FileFactory.getConfiguration(); + } + + public SegmentStatusManager(AbsoluteTableIdentifier identifier, Configuration configuration) { + this.identifier = identifier; + this.configuration = configuration; } /** @@ -93,21 +102,10 @@ public class SegmentStatusManager { } } - /** - * get valid segment for given table - * - * @return - * @throws IOException - */ public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException { return getValidAndInvalidSegments(null, null); } - public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments( - LoadMetadataDetails[] loadMetadataDetails) throws IOException { - return getValidAndInvalidSegments(loadMetadataDetails, null); - } - /** * get valid segment for given load status details. */ @@ -129,7 +127,8 @@ public class SegmentStatusManager { } if (readCommittedScope == null) { - readCommittedScope = new TableStatusReadCommittedScope(identifier, loadMetadataDetails); + readCommittedScope = new TableStatusReadCommittedScope(identifier, loadMetadataDetails, + configuration); } //just directly iterate Array for (LoadMetadataDetails segment : loadMetadataDetails) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java index 168a526..27bc620 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java +++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java @@ -47,11 +47,19 @@ import org.apache.carbondata.core.scan.executor.util.QueryUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.format.BlockIndex; +import org.apache.hadoop.conf.Configuration; + /** * Footer reader class */ public abstract class AbstractDataFileFooterConverter { + protected Configuration configuration; + + AbstractDataFileFooterConverter(Configuration configuration) { + this.configuration = configuration; + } + /** * Below method will be used to convert the thrift presence meta to wrapper * presence meta @@ -77,7 +85,8 @@ public abstract class AbstractDataFileFooterConverter { * @return list of index info * @throws IOException problem while reading the index file */ - public List<DataFileFooter> getIndexInfo(String filePath, List<TableBlockInfo> tableBlockInfoList) + public List<DataFileFooter> getIndexInfo(String filePath, List<TableBlockInfo> + tableBlockInfoList) throws IOException { CarbonIndexFileReader indexReader = new CarbonIndexFileReader(); List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>(); @@ -144,7 +153,7 @@ public abstract class AbstractDataFileFooterConverter { */ public List<DataFileFooter> getIndexInfo(String filePath, byte[] fileData, boolean isTransactionalTable) throws IOException { - CarbonIndexFileReader indexReader = new CarbonIndexFileReader(); + CarbonIndexFileReader indexReader = new CarbonIndexFileReader(configuration); List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>(); String parentPath = filePath.substring(0, filePath.lastIndexOf("/")); try { @@ -188,7 +197,7 @@ public abstract class AbstractDataFileFooterConverter { } if (readBlockIndexInfo.isSetBlocklet_info()) { List<BlockletInfo> blockletInfoList = new ArrayList<BlockletInfo>(); - BlockletInfo blockletInfo = new DataFileFooterConverterV3() + BlockletInfo blockletInfo = new DataFileFooterConverterV3(configuration) .getBlockletInfo(readBlockIndexInfo.getBlocklet_info(), CarbonUtil.getNumberOfDimensionColumns(columnSchemaList)); blockletInfo.setBlockletIndex(blockletIndex); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java index 8e8b075..f14610c 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java @@ -61,6 +61,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; @@ -80,7 +81,7 @@ public class BlockletDataMapUtil { && indexFileStore.getFileData(identifier.getIndexFileName()) == null) { CarbonFile indexMergeFile = FileFactory.getCarbonFile( identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier - .getMergeIndexFileName()); + .getMergeIndexFileName(), identifierWrapper.getConfiguration()); if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) { indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile }); filesRead.add(indexMergeFile.getPath()); @@ -89,7 +90,7 @@ public class BlockletDataMapUtil { if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) { indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile( identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier - .getIndexFileName()) }); + .getIndexFileName(), identifierWrapper.getConfiguration()) }); } Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>(); CarbonTable carbonTable = identifierWrapper.getCarbonTable(); @@ -98,7 +99,8 @@ public class BlockletDataMapUtil { tableColumnList = carbonTable.getTableInfo().getFactTable().getListOfColumns(); } - DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + DataFileFooterConverter fileFooterConverter = + new DataFileFooterConverter(identifierWrapper.getConfiguration()); List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo( identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()), @@ -139,9 +141,9 @@ public class BlockletDataMapUtil { * @throws IOException */ public static Map<String, BlockMetaInfo> createCarbonDataFileBlockMetaInfoMapping( - String segmentFilePath) throws IOException { + String segmentFilePath, Configuration configuration) throws IOException { Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap(); - CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath); + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath, configuration); if (carbonFile instanceof AbstractDFSCarbonFile) { PathFilter pathFilter = new PathFilter() { @Override public boolean accept(Path path) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index c9601c0..fa982be 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -1277,34 +1277,6 @@ public final class CarbonUtil { } /** - * Below method will be used to get all the block index info from index file - * - * @param taskId task id of the file - * @param tableBlockInfoList list of table block - * @param identifier absolute table identifier - * @return list of block info - * @throws IOException if any problem while reading - */ - public static List<DataFileFooter> readCarbonIndexFile(String taskId, String bucketNumber, - List<TableBlockInfo> tableBlockInfoList, AbsoluteTableIdentifier identifier) - throws IOException { - // need to sort the block info list based for task in ascending order so - // it will be sinkup with block index read from file - Collections.sort(tableBlockInfoList); - // geting the index file path - //TODO need to pass proper partition number when partiton will be supported - String carbonIndexFilePath = CarbonTablePath - .getCarbonIndexFilePath(identifier.getTablePath(), taskId, - tableBlockInfoList.get(0).getSegmentId(), - bucketNumber, CarbonTablePath.DataFileUtil - .getTimeStampFromFileName(tableBlockInfoList.get(0).getFilePath()), - tableBlockInfoList.get(0).getVersion()); - DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); - // read the index info and return - return fileFooterConverter.getIndexInfo(carbonIndexFilePath, tableBlockInfoList); - } - - /** * initialize the value of dictionary chunk that can be kept in memory at a time * * @return @@ -2243,17 +2215,17 @@ public final class CarbonUtil { } } - public static String getFilePathExternalFilePath(String path) { + public static String getFilePathExternalFilePath(String path, Configuration configuration) { // return the list of carbondata files in the given path. - CarbonFile segment = FileFactory.getCarbonFile(path, FileFactory.getFileType(path)); + CarbonFile segment = FileFactory.getCarbonFile(path, configuration); CarbonFile[] dataFiles = segment.listFiles(); for (CarbonFile dataFile : dataFiles) { if (dataFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) { return dataFile.getAbsolutePath(); } else if (dataFile.isDirectory()) { - return getFilePathExternalFilePath(dataFile.getAbsolutePath()); + return getFilePathExternalFilePath(dataFile.getAbsolutePath(), configuration); } } return null; @@ -2265,12 +2237,14 @@ public final class CarbonUtil { * @return table info containing the schema */ public static org.apache.carbondata.format.TableInfo inferSchema(String carbonDataFilePath, - String tableName, boolean isCarbonFileProvider) throws IOException { + String tableName, boolean isCarbonFileProvider, Configuration configuration) + throws IOException { String fistFilePath = null; if (isCarbonFileProvider) { - fistFilePath = getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null"); + fistFilePath = getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null", + configuration); } else { - fistFilePath = getFilePathExternalFilePath(carbonDataFilePath); + fistFilePath = getFilePathExternalFilePath(carbonDataFilePath, configuration); } if (fistFilePath == null) { // Check if we can infer the schema from the hive metastore. @@ -2645,7 +2619,7 @@ public final class CarbonUtil { HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>(); Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getLocationMap(); if (locationMap != null) { - fileStore.readIndexFiles(); + fileStore.readIndexFiles(FileFactory.getConfiguration()); Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap(); // get the size of carbonindex file carbonIndexSize = getCarbonIndexSize(fileStore, locationMap); @@ -3192,8 +3166,7 @@ public final class CarbonUtil { * @param carbonTable * carbon Table */ - public static ColumnarFormatVersion getFormatVersion(CarbonTable carbonTable) - throws IOException { + public static ColumnarFormatVersion getFormatVersion(CarbonTable carbonTable) throws IOException { String segmentPath = null; boolean supportFlatFolder = carbonTable.isSupportFlatFolder(); CarbonIndexFileReader indexReader = new CarbonIndexFileReader(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java index 670536e..61b4f37 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java @@ -33,12 +33,22 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.reader.CarbonFooterReader; import org.apache.carbondata.format.FileFooter; +import org.apache.hadoop.conf.Configuration; + /** * Below class will be used to convert the thrift object of data file * meta data to wrapper object */ public class DataFileFooterConverter extends AbstractDataFileFooterConverter { + public DataFileFooterConverter(Configuration configuration) { + super(configuration); + } + + public DataFileFooterConverter() { + super(FileFactory.getConfiguration()); + } + /** * Below method will be used to convert thrift file meta to wrapper file meta */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java index b20a336..db52991 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; @@ -29,6 +30,8 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.reader.CarbonFooterReader; import org.apache.carbondata.format.FileFooter; +import org.apache.hadoop.conf.Configuration; + /** * Below class will be used to convert the thrift object of data file * meta data to wrapper object for version 2 data file @@ -36,6 +39,14 @@ import org.apache.carbondata.format.FileFooter; public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter { + public DataFileFooterConverter2(Configuration configuration) { + super(configuration); + } + + public DataFileFooterConverter2() { + super(FileFactory.getConfiguration()); + } + /** * Below method will be used to convert thrift file meta to wrapper file meta */ @@ -136,6 +147,6 @@ public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter { } @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException { - return new DataFileFooterConverter().getSchema(tableBlockInfo); + return new DataFileFooterConverter(configuration).getSchema(tableBlockInfo); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java index 6a968b4..41e22fd 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; @@ -31,8 +32,18 @@ import org.apache.carbondata.core.reader.CarbonHeaderReader; import org.apache.carbondata.format.FileFooter3; import org.apache.carbondata.format.FileHeader; +import org.apache.hadoop.conf.Configuration; + public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter { + public DataFileFooterConverterV3(Configuration configuration) { + super(configuration); + } + + public DataFileFooterConverterV3() { + super(FileFactory.getConfiguration()); + } + /** * Below method will be used to convert thrift file meta to wrapper file meta * This method will read the footer from footer offset present in the data file http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java index 1634091..9dde6b7 100644 --- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.fileoperations.FileWriteOperation; import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; import org.apache.carbondata.core.metadata.SegmentFileStore; @@ -77,7 +78,8 @@ public class CarbonIndexFileMergeWriter { List<CarbonFile> indexCarbonFiles = sfs.getIndexCarbonFiles(); indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]); } else { - indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath); + indexFiles = + SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration()); } if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) { if (sfs == null) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java index a3acfab..34dca0b 100644 --- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java +++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java @@ -40,10 +40,12 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope; import mockit.Deencapsulation; import mockit.Mock; import mockit.MockUp; +import org.apache.hadoop.conf.Configuration; import org.junit.Before; import org.junit.Test; @@ -103,7 +105,8 @@ public class TestBlockletDataMapFactory { BlockletDataMapIndexWrapper.class); method.setAccessible(true); method.invoke(blockletDataMapFactory, tableBlockIndexUniqueIdentifierWrapper, - new BlockletDataMapIndexWrapper(tableBlockIndexUniqueIdentifier.getSegmentId(), dataMaps)); + new BlockletDataMapIndexWrapper(tableBlockIndexUniqueIdentifier.getSegmentId(), dataMaps, + tableBlockIndexUniqueIdentifierWrapper.getConfiguration())); BlockletDataMapIndexWrapper result = cache.getIfPresent(tableBlockIndexUniqueIdentifierWrapper); assert null != result; } @@ -111,7 +114,8 @@ public class TestBlockletDataMapFactory { @Test public void getValidDistributables() throws IOException { BlockletDataMapDistributable blockletDataMapDistributable = new BlockletDataMapDistributable( "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756709.carbonindex"); - Segment segment = new Segment("0", null); + Segment segment = new Segment("0", null, new TableStatusReadCommittedScope(carbonTable + .getAbsoluteTableIdentifier(), new Configuration(false))); blockletDataMapDistributable.setSegment(segment); BlockletDataMapDistributable blockletDataMapDistributable1 = new BlockletDataMapDistributable( "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756701.carbonindex"); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java index 5520bfb..a4abc61 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java @@ -684,8 +684,8 @@ public class CarbonUtilTest { @Test public void testToGetSegmentString() { List<Segment> list = new ArrayList<>(); - list.add(new Segment("1", null)); - list.add(new Segment("2", null)); + list.add(new Segment("1", null, null)); + list.add(new Segment("2", null, null)); String segments = CarbonUtil.convertToString(list); assertEquals(segments, "1,2"); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java index e506994..2705b63 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java @@ -46,6 +46,7 @@ import org.apache.carbondata.format.IndexHeader; import mockit.Mock; import mockit.MockUp; +import org.apache.hadoop.conf.Configuration; import org.junit.Test; import static junit.framework.TestCase.assertEquals; @@ -143,7 +144,7 @@ public class DataFileFooterConverterTest { new MockUp<FileFactory>() { @SuppressWarnings("unused") @Mock public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType, - int bufferSize) { + int bufferSize, Configuration configuration) { return dataInputStream; } }; http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java index 0d240c4..1e5b79c 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java @@ -237,7 +237,7 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa } for (String shard : shardPaths) { BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap(); - bloomDM.init(new BloomDataMapModel(shard, cache)); + bloomDM.init(new BloomDataMapModel(shard, cache, segment.getConfiguration())); bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns()); dataMaps.add(bloomDM); } @@ -253,7 +253,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>(); BloomCoarseGrainDataMap bloomCoarseGrainDataMap = new BloomCoarseGrainDataMap(); String indexPath = ((BloomDataMapDistributable) distributable).getIndexPath(); - bloomCoarseGrainDataMap.init(new BloomDataMapModel(indexPath, cache)); + bloomCoarseGrainDataMap + .init(new BloomDataMapModel(indexPath, cache, FileFactory.getConfiguration())); bloomCoarseGrainDataMap.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns()); coarseGrainDataMaps.add(bloomCoarseGrainDataMap); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java index 9d5d741..7ae4906 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java @@ -19,13 +19,16 @@ package org.apache.carbondata.datamap.bloom; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.datamap.dev.DataMapModel; +import org.apache.hadoop.conf.Configuration; + public class BloomDataMapModel extends DataMapModel { private Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache; public BloomDataMapModel(String filePath, - Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache) { - super(filePath); + Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache, + Configuration configuration) { + super(filePath, configuration); this.cache = cache; }
