Repository: carbondata Updated Branches: refs/heads/master 668bfdd50 -> 8b33ab240
[CARBONDATA-2376] Improve Lucene datamap performance by eliminating blockid while writing and reading index Problem: Currently DataMap interface implementations use blockid and blockletid while writing index files, Actually blockid is not needed to store in index files as it only requires blockletid. So it adds more memory and disk size to write index files. Solution: Use taskname as index name to identify the indexname. And filter the blocklets directly by avoiding blockids.And pass the taskName as indexname to identify the blockid from blocletdatamap. Corrected the implementations of LuceneDatamap, BloomFilterDataMap, CGDataMap, FGDataMap and MinMaxDataMap This closes #2206 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8b33ab24 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8b33ab24 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8b33ab24 Branch: refs/heads/master Commit: 8b33ab240126e999e9196369025917370172eee4 Parents: 668bfdd Author: ravipesala <[email protected]> Authored: Sun Apr 22 10:19:53 2018 +0530 Committer: Jacky Li <[email protected]> Committed: Mon Apr 23 16:28:35 2018 +0800 ---------------------------------------------------------------------- .../apache/carbondata/core/datamap/Segment.java | 12 +-- .../core/datamap/dev/DataMapWriter.java | 2 +- .../dev/fgdatamap/FineGrainBlocklet.java | 4 +- .../carbondata/core/indexstore/Blocklet.java | 18 ++-- .../core/indexstore/ExtendedBlocklet.java | 2 +- .../blockletindex/BlockletDataMapFactory.java | 3 +- .../table/DiskBasedDMSchemaStorageProvider.java | 4 +- .../core/util/path/CarbonTablePath.java | 11 ++- .../datamap/bloom/BloomCoarseGrainDataMap.java | 22 ++--- .../bloom/BloomCoarseGrainDataMapFactory.java | 15 ++-- .../carbondata/datamap/bloom/BloomDMModel.java | 9 +- .../datamap/bloom/BloomDataMapWriter.java | 43 +++++----- .../bloom/BloomCoarseGrainDataMapSuite.scala | 4 +- datamap/examples/pom.xml | 5 ++ .../datamap/examples/MinMaxDataWriter.java | 11 ++- .../datamap/examples/MinMaxDataMapSuite.scala | 2 - .../lucene/LuceneDataMapFactoryBase.java | 6 +- .../datamap/lucene/LuceneDataMapWriter.java | 53 ++++-------- .../datamap/lucene/LuceneFineGrainDataMap.java | 90 ++++++++++---------- .../hadoop/api/CarbonInputFormat.java | 7 +- .../testsuite/datamap/CGDataMapTestCase.scala | 52 +++++------ .../testsuite/datamap/DataMapWriterSuite.scala | 2 +- .../testsuite/datamap/FGDataMapTestCase.scala | 59 +++++++------ .../testsuite/datamap/TestDataMapStatus.scala | 2 +- .../TestInsertAndOtherCommandConcurrent.scala | 2 +- .../datamap/DataMapWriterListener.java | 4 +- .../store/writer/AbstractFactDataWriter.java | 5 +- 27 files changed, 218 insertions(+), 231 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java index ce0b90b..251ea38 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java @@ -42,9 +42,9 @@ public class Segment implements Serializable { private String segmentFileName; /** - * List of index files which are already got filtered through CG index operation. + * List of index shards which are already got filtered through CG index operation. */ - private Set<String> filteredIndexFiles = new HashSet<>(); + private Set<String> filteredIndexShardNames = new HashSet<>(); /** * Points to the Read Committed Scope of the segment. This is a flavor of @@ -149,12 +149,12 @@ public class Segment implements Serializable { return null; } - public Set<String> getFilteredIndexFiles() { - return filteredIndexFiles; + public Set<String> getFilteredIndexShardNames() { + return filteredIndexShardNames; } - public void setFilteredIndexFile(String filteredIndexFile) { - this.filteredIndexFiles.add(filteredIndexFile); + public void setFilteredIndexShardName(String filteredIndexShardName) { + this.filteredIndexShardNames.add(filteredIndexShardName); } @Override public boolean equals(Object o) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java index 29670a1..1933f70 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java @@ -54,7 +54,7 @@ public abstract class DataMapWriter { * * @param blockId file name of the carbondata file */ - public abstract void onBlockStart(String blockId, long taskId) throws IOException; + public abstract void onBlockStart(String blockId, String indexShardName) throws IOException; /** * End of block notification http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java index b42500f..9c78cc8 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/fgdatamap/FineGrainBlocklet.java @@ -41,8 +41,8 @@ public class FineGrainBlocklet extends Blocklet implements Serializable { private List<Page> pages; - public FineGrainBlocklet(String blockId, String blockletId, List<Page> pages) { - super(blockId, blockletId); + public FineGrainBlocklet(String taskName, String blockletId, List<Page> pages) { + super(taskName, blockletId); this.pages = pages; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java index c3eda6b..9b40be4 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/Blocklet.java @@ -29,13 +29,13 @@ import org.apache.carbondata.core.metadata.schema.table.Writable; public class Blocklet implements Writable,Serializable { /** file path of this blocklet */ - private String blockId; + private String taskName; /** id to identify the blocklet inside the block (it is a sequential number) */ private String blockletId; - public Blocklet(String blockId, String blockletId) { - this.blockId = blockId; + public Blocklet(String taskName, String blockletId) { + this.taskName = taskName; this.blockletId = blockletId; } @@ -47,17 +47,17 @@ public class Blocklet implements Writable,Serializable { return blockletId; } - public String getBlockId() { - return blockId; + public String getTaskName() { + return taskName; } @Override public void write(DataOutput out) throws IOException { - out.writeUTF(blockId); + out.writeUTF(taskName); out.writeUTF(blockletId); } @Override public void readFields(DataInput in) throws IOException { - blockId = in.readUTF(); + taskName = in.readUTF(); blockletId = in.readUTF(); } @@ -67,7 +67,7 @@ public class Blocklet implements Writable,Serializable { Blocklet blocklet = (Blocklet) o; - if (blockId != null ? !blockId.equals(blocklet.blockId) : blocklet.blockId != null) { + if (taskName != null ? !taskName.equals(blocklet.taskName) : blocklet.taskName != null) { return false; } return blockletId != null ? @@ -76,7 +76,7 @@ public class Blocklet implements Writable,Serializable { } @Override public int hashCode() { - int result = blockId != null ? blockId.hashCode() : 0; + int result = taskName != null ? taskName.hashCode() : 0; result = 31 * result + (blockletId != null ? blockletId.hashCode() : 0); return result; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java index d2af5cb..ea2752c 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java @@ -66,7 +66,7 @@ public class ExtendedBlocklet extends Blocklet { } public String getPath() { - return getBlockId(); + return getTaskName(); } public String getDataMapWriterPath() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index caac733..7c6427d 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -145,9 +145,8 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory private ExtendedBlocklet getExtendedBlocklet(List<TableBlockIndexUniqueIdentifier> identifiers, Blocklet blocklet) throws IOException { - String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getBlockId()); for (TableBlockIndexUniqueIdentifier identifier : identifiers) { - if (identifier.getIndexFileName().equals(carbonIndexFileName)) { + if (identifier.getIndexFileName().startsWith(blocklet.getTaskName())) { DataMap dataMap = cache.get(identifier); return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId()); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java index 9168f55..e1a929c 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java @@ -108,8 +108,8 @@ public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStoragePro for (DataMapSchema dataMapSchema : this.dataMapSchemas) { List<RelationIdentifier> parentTables = dataMapSchema.getParentTables(); for (RelationIdentifier identifier : parentTables) { - if (identifier.getTableName().equals(carbonTable.getTableName()) && - identifier.getDatabaseName().equals(carbonTable.getDatabaseName())) { + if (identifier.getTableName().equalsIgnoreCase(carbonTable.getTableName()) && + identifier.getDatabaseName().equalsIgnoreCase(carbonTable.getDatabaseName())) { dataMapSchemas.add(dataMapSchema); break; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index 6de26ad..0538e7f 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -643,8 +643,17 @@ public class CarbonTablePath { } public static String getCarbonIndexFileName(String actualBlockName) { + return getUniqueTaskName(actualBlockName) + INDEX_FILE_EXT; + } + + /** + * Unique task name + * @param actualBlockName + * @return + */ + public static String getUniqueTaskName(String actualBlockName) { return DataFileUtil.getTaskNo(actualBlockName) + "-" + DataFileUtil.getBucketNo(actualBlockName) - + "-" + DataFileUtil.getTimeStampFromFileName(actualBlockName) + INDEX_FILE_EXT; + + "-" + DataFileUtil.getTimeStampFromFileName(actualBlockName); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java index 6e1a2eb..81cccf2 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java @@ -36,7 +36,6 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; -import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.scan.expression.ColumnExpression; @@ -68,10 +67,12 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap { private List<BloomDMModel> bloomIndexList; private Multimap<String, List<BloomDMModel>> indexCol2BloomDMList; public static final String BLOOM_INDEX_SUFFIX = ".bloomindex"; + private String shardName; @Override - public void init(DataMapModel dataMapModel) throws MemoryException, IOException { + public void init(DataMapModel dataMapModel) throws IOException { Path indexPath = FileFactory.getPath(dataMapModel.getFilePath()); + this.shardName = indexPath.getName(); FileSystem fs = FileFactory.getFileSystem(indexPath); if (!fs.exists(indexPath)) { throw new IOException( @@ -93,8 +94,9 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap { indexCol2BloomDMList = ArrayListMultimap.create(); for (int i = 0; i < indexFileStatus.length; i++) { indexFilePath[i] = indexFileStatus[i].getPath().toString(); - String indexCol = StringUtils.substringBetween(indexFilePath[i], ".carbondata.", - BLOOM_INDEX_SUFFIX); + String indexfilename = indexFileStatus[i].getPath().getName(); + String indexCol = + indexfilename.substring(0, indexfilename.length() - BLOOM_INDEX_SUFFIX.length()); indexedColumn.add(indexCol); bloomIndexList.addAll(readBloomIndex(indexFilePath[i])); indexCol2BloomDMList.put(indexCol, readBloomIndex(indexFilePath[i])); @@ -149,15 +151,15 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap { convertValueToBytes(bloomQueryModel.dataType, bloomQueryModel.filterValue)); if (scanRequired) { LOGGER.info(String.format( - "BloomCoarseGrainDataMap: Need to scan block#%s -> blocklet#%s", - bloomDMModel.getBlockId(), String.valueOf(bloomDMModel.getBlockletNo()))); - Blocklet blocklet = new Blocklet(bloomDMModel.getBlockId(), - String.valueOf(bloomDMModel.getBlockletNo())); + "BloomCoarseGrainDataMap: Need to scan -> blocklet#%s", + String.valueOf(bloomDMModel.getBlockletNo()))); + Blocklet blocklet = + new Blocklet(shardName, String.valueOf(bloomDMModel.getBlockletNo())); hitBlocklets.add(blocklet); } else { LOGGER.info(String.format( - "BloomCoarseGrainDataMap: Skip scan block#%s -> blocklet#%s", - bloomDMModel.getBlockId(), String.valueOf(bloomDMModel.getBlockletNo()))); + "BloomCoarseGrainDataMap: Skip scan -> blocklet#%s", + String.valueOf(bloomDMModel.getBlockletNo()))); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java index 3430a65..a2f9693 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java @@ -38,7 +38,6 @@ import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; @@ -168,15 +167,19 @@ public class BloomCoarseGrainDataMapFactory implements DataMapFactory<CoarseGrai @Override public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException { List<CoarseGrainDataMap> dataMaps = new ArrayList<CoarseGrainDataMap>(1); - BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap(); try { - bloomDM.init(new DataMapModel(BloomDataMapWriter.genDataMapStorePath( + String dataMapStorePath = BloomDataMapWriter.genDataMapStorePath( CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segment.getSegmentNo()), - dataMapName))); - } catch (MemoryException e) { + dataMapName); + CarbonFile[] carbonFiles = FileFactory.getCarbonFile(dataMapStorePath).listFiles(); + for (CarbonFile carbonFile : carbonFiles) { + BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap(); + bloomDM.init(new DataMapModel(carbonFile.getAbsolutePath())); + dataMaps.add(bloomDM); + } + } catch (Exception e) { throw new IOException("Error occurs while init Bloom DataMap", e); } - dataMaps.add(bloomDM); return dataMaps; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java index b72f08f..6351199 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDMModel.java @@ -25,20 +25,14 @@ import com.google.common.hash.BloomFilter; @InterfaceAudience.Internal public class BloomDMModel implements Serializable { private static final long serialVersionUID = 7281578747306832771L; - private String blockId; private int blockletNo; private BloomFilter<byte[]> bloomFilter; - public BloomDMModel(String blockId, int blockletNo, BloomFilter<byte[]> bloomFilter) { - this.blockId = blockId; + public BloomDMModel(int blockletNo, BloomFilter<byte[]> bloomFilter) { this.blockletNo = blockletNo; this.bloomFilter = bloomFilter; } - public String getBlockId() { - return blockId; - } - public int getBlockletNo() { return blockletNo; } @@ -49,7 +43,6 @@ public class BloomDMModel implements Serializable { @Override public String toString() { final StringBuilder sb = new StringBuilder("BloomDMModel{"); - sb.append("blockId='").append(blockId).append('\''); sb.append(", blockletNo=").append(blockletNo); sb.append(", bloomFilter=").append(bloomFilter); sb.append('}'); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java index 4065523..76ee084 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.DataMapMeta; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMapWriter; @@ -38,8 +39,6 @@ import org.apache.carbondata.core.util.CarbonUtil; import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; /** * BloomDataMap is constructed in blocklet level. For each indexed column, a bloom filter is @@ -55,7 +54,7 @@ public class BloomDataMapWriter extends DataMapWriter { // map column name to ordinal in pages private Map<String, Integer> col2Ordianl; private Map<String, DataType> col2DataType; - private String currentBlockId; + private String indexShardName; private int currentBlockletId; private List<String> currentDMFiles; private List<DataOutputStream> currentDataOutStreams; @@ -80,22 +79,16 @@ public class BloomDataMapWriter extends DataMapWriter { } @Override - public void onBlockStart(String blockId, long taskId) throws IOException { - this.currentBlockId = blockId; - this.currentBlockletId = 0; - currentDMFiles.clear(); - currentDataOutStreams.clear(); - currentObjectOutStreams.clear(); - initDataMapFile(); + public void onBlockStart(String blockId, String indexShardName) throws IOException { + if (this.indexShardName == null) { + this.indexShardName = indexShardName; + initDataMapFile(); + } } @Override public void onBlockEnd(String blockId) throws IOException { - for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) { - CarbonUtil.closeStreams(this.currentDataOutStreams.get(indexColId), - this.currentObjectOutStreams.get(indexColId)); - commitFile(this.currentDMFiles.get(indexColId)); - } + } @Override @@ -159,9 +152,11 @@ public class BloomDataMapWriter extends DataMapWriter { private void initDataMapFile() throws IOException { String dataMapDir = genDataMapStorePath(this.writeDirectoryPath, this.dataMapName); + dataMapDir = dataMapDir + CarbonCommonConstants.FILE_SEPARATOR + this.indexShardName; + FileFactory.mkdirs(dataMapDir, FileFactory.getFileType(dataMapDir)); for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) { - String dmFile = dataMapDir + File.separator + this.currentBlockId - + '.' + indexedColumns.get(indexColId) + BloomCoarseGrainDataMap.BLOOM_INDEX_SUFFIX; + String dmFile = dataMapDir + CarbonCommonConstants.FILE_SEPARATOR + + indexedColumns.get(indexColId) + BloomCoarseGrainDataMap.BLOOM_INDEX_SUFFIX; DataOutputStream dataOutStream = null; ObjectOutputStream objectOutStream = null; try { @@ -182,7 +177,7 @@ public class BloomDataMapWriter extends DataMapWriter { private void writeBloomDataMapFile() throws IOException { for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) { - BloomDMModel model = new BloomDMModel(this.currentBlockId, this.currentBlockletId, + BloomDMModel model = new BloomDMModel(this.currentBlockletId, indexBloomFilters.get(indexColId)); // only in higher version of guava-bloom-filter, it provides readFrom/writeTo interface. // In lower version, we use default java serializer to write bloomfilter. @@ -194,7 +189,11 @@ public class BloomDataMapWriter extends DataMapWriter { @Override public void finish() throws IOException { - + for (int indexColId = 0; indexColId < indexedColumns.size(); indexColId++) { + CarbonUtil.closeStreams(this.currentDataOutStreams.get(indexColId), + this.currentObjectOutStreams.get(indexColId)); + commitFile(this.currentDMFiles.get(indexColId)); + } } @Override @@ -213,11 +212,7 @@ public class BloomDataMapWriter extends DataMapWriter { public static String genDataMapStorePath(String dataPath, String dataMapName) throws IOException { String dmDir = dataPath + File.separator + dataMapName; - Path dmPath = FileFactory.getPath(dmDir); - FileSystem fs = FileFactory.getFileSystem(dmPath); - if (!fs.exists(dmPath)) { - fs.mkdirs(dmPath); - } + FileFactory.mkdirs(dmDir, FileFactory.getFileType(dmDir)); return dmDir; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala b/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala index e7bab95..21283fe 100644 --- a/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala +++ b/datamap/bloom/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala @@ -75,8 +75,8 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { sql(s"select * from $bloomDMSampleTable limit 5").show(false) checkExistence(sql(s"show datamap on table $bloomDMSampleTable"), true, dataMapName) - checkAnswer(sql(s"show datamap on table $bloomDMSampleTable"), - Row(dataMapName, classOf[BloomCoarseGrainDataMapFactory].getName, "(NA)")) +// checkAnswer(sql(s"show datamap on table $bloomDMSampleTable"), +// Row(dataMapName, classOf[BloomCoarseGrainDataMapFactory].getName, "(NA)")) checkAnswer(sql(s"select * from $bloomDMSampleTable where id = 1"), sql(s"select * from $normalTable where id = 1")) checkAnswer(sql(s"select * from $bloomDMSampleTable where id = 999"), http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/examples/pom.xml ---------------------------------------------------------------------- diff --git a/datamap/examples/pom.xml b/datamap/examples/pom.xml index 30e1522..a22e902 100644 --- a/datamap/examples/pom.xml +++ b/datamap/examples/pom.xml @@ -41,6 +41,11 @@ <artifactId>carbondata-spark2</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java index e68b481..21a0b8e 100644 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java @@ -61,6 +61,7 @@ public class MinMaxDataWriter extends DataMapWriter { private String dataMapName; private int columnCnt; private DataType[] dataTypeArray; + private String indexShardName; /** * Since the sequence of indexed columns is defined the same as order in user-created, so @@ -90,12 +91,14 @@ public class MinMaxDataWriter extends DataMapWriter { } } - @Override public void onBlockStart(String blockId, long taskId) { - blockMinMaxMap = new HashMap<Integer, BlockletMinMax>(); + @Override public void onBlockStart(String blockId, String indexShardName) { + if (blockMinMaxMap == null) { + blockMinMaxMap = new HashMap<>(); + this.indexShardName = indexShardName; + } } @Override public void onBlockEnd(String blockId) { - updateMinMaxIndex(blockId); } @Override public void onBlockletStart(int blockletId) { @@ -300,7 +303,7 @@ public class MinMaxDataWriter extends DataMapWriter { } @Override public void finish() throws IOException { - + updateMinMaxIndex(indexShardName); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala b/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala index b568536..8436e07 100644 --- a/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala +++ b/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala @@ -69,8 +69,6 @@ class MinMaxDataMapSuite extends QueryTest with BeforeAndAfterAll { """.stripMargin) sql(s"show datamap on table $minMaxDMSampleTable").show(false) - checkAnswer(sql(s"show datamap on table $minMaxDMSampleTable"), - Row(dataMapName, classOf[MinMaxIndexDataMapFactory].getName, "(NA)")) // not that the table will use default dimension as sort_columns, so for the following cases, // the pruning result will differ. // 1 blocklet http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java index 672880f..a7b8831 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java @@ -210,10 +210,14 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac */ @Override public List<DataMapDistributable> toDistributable(Segment segment) { - List<DataMapDistributable> lstDataMapDistribute = new ArrayList<DataMapDistributable>(); + List<DataMapDistributable> lstDataMapDistribute = new ArrayList<>(); CarbonFile[] indexDirs = LuceneDataMapWriter .getAllIndexDirs(tableIdentifier.getTablePath(), segment.getSegmentNo(), dataMapName); for (CarbonFile indexDir : indexDirs) { + // Filter out the tasks which are filtered through CG datamap. + if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) { + continue; + } DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable( CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segment.getSegmentNo()), indexDir.getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java index 6cc89d6..2025f73 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java @@ -29,7 +29,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; @@ -52,7 +51,6 @@ import org.apache.lucene.document.IntPoint; import org.apache.lucene.document.IntRangeField; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.StoredField; -import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; @@ -78,16 +76,12 @@ public class LuceneDataMapWriter extends DataMapWriter { private Analyzer analyzer = null; - private String blockId = null; - private String dataMapName = null; private boolean isFineGrain = true; private List<String> indexedCarbonColumns = null; - private static final String BLOCKID_NAME = "blockId"; - private static final String BLOCKLETID_NAME = "blockletId"; private static final String PAGEID_NAME = "pageId"; @@ -102,24 +96,25 @@ public class LuceneDataMapWriter extends DataMapWriter { this.indexedCarbonColumns = indexedCarbonColumns; } - private String getIndexPath(long taskId) { + private String getIndexPath(String taskName) { if (isFineGrain) { - return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, dataMapName, taskId); + return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, dataMapName, + taskName); } else { // TODO: where write data in coarse grain data map - return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, dataMapName, taskId); + return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, dataMapName, + taskName); } } /** * Start of new block notification. */ - public void onBlockStart(String blockId, long taskId) throws IOException { + public void onBlockStart(String blockId, String indexShardName) throws IOException { // save this block id for lucene index , used in onPageAdd function - this.blockId = blockId; // get index path, put index data into segment's path - String strIndexPath = getIndexPath(taskId); + String strIndexPath = getIndexPath(indexShardName); Path indexPath = FileFactory.getPath(strIndexPath); FileSystem fs = FileFactory.getFileSystem(indexPath); @@ -154,13 +149,6 @@ public class LuceneDataMapWriter extends DataMapWriter { * End of block notification */ public void onBlockEnd(String blockId) throws IOException { - // clean this block id - this.blockId = null; - - // finished a file , close this index writer - if (indexWriter != null) { - indexWriter.close(); - } } @@ -201,10 +189,6 @@ public class LuceneDataMapWriter extends DataMapWriter { for (int rowId = 0; rowId < pageSize; rowId++) { // create a new document Document doc = new Document(); - - // add block id, save this id - doc.add(new StringField(BLOCKID_NAME, blockId, Field.Store.YES)); - // add blocklet Id doc.add(new IntPoint(BLOCKLETID_NAME, new int[] { blockletId })); doc.add(new StoredField(BLOCKLETID_NAME, blockletId)); @@ -339,7 +323,10 @@ public class LuceneDataMapWriter extends DataMapWriter { * class. */ public void finish() throws IOException { - + // finished a file , close this index writer + if (indexWriter != null) { + indexWriter.close(); + } } /** @@ -350,16 +337,16 @@ public class LuceneDataMapWriter extends DataMapWriter { } /** - * Return store path for datamap based on the taskId, if three tasks get launched during loading, + * Return store path for datamap based on the taskName,if three tasks get launched during loading, * then three folders will be created based on the three task Ids and lucene index file will be * written into those folders + * * @return store path based on taskID */ private static String genDataMapStorePathOnTaskId(String tablePath, String segmentId, - String dataMapName, long taskId) { + String dataMapName, String taskName) { return CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName - + File.separator + dataMapName + CarbonCommonConstants.UNDERSCORE + taskId - + CarbonCommonConstants.UNDERSCORE + System.currentTimeMillis(); + + File.separator + taskName; } /** @@ -375,14 +362,6 @@ public class LuceneDataMapWriter extends DataMapWriter { CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName; FileFactory.FileType fileType = FileFactory.getFileType(dmPath); final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType); - return dirPath.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile file) { - if (file.isDirectory() && file.getName().startsWith(dataMapName)) { - return true; - } else { - return false; - } - } - }); + return dirPath.listFiles(); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java index 3caefd2..cf5e2da 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java @@ -18,7 +18,12 @@ package org.apache.carbondata.datamap.lucene; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.logging.LogService; @@ -29,7 +34,6 @@ import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.PartitionSpec; -import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.filter.intf.ExpressionType; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; @@ -45,20 +49,21 @@ import org.apache.lucene.index.IndexableField; import org.apache.lucene.queryparser.classic.MultiFieldQueryParser; import org.apache.lucene.queryparser.classic.ParseException; import org.apache.lucene.queryparser.classic.QueryParser; -import org.apache.lucene.search.*; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.Directory; import org.apache.solr.store.hdfs.HdfsDirectory; @InterfaceAudience.Internal public class LuceneFineGrainDataMap extends FineGrainDataMap { - private static final int BLOCKID_ID = 0; + private static final int BLOCKLETID_ID = 0; - private static final int BLOCKLETID_ID = 1; + private static final int PAGEID_ID = 1; - private static final int PAGEID_ID = 2; - - private static final int ROWID_ID = 3; + private static final int ROWID_ID = 2; /** * log information @@ -81,6 +86,8 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap { */ private Analyzer analyzer; + private String taskName; + LuceneFineGrainDataMap(Analyzer analyzer) { this.analyzer = analyzer; } @@ -88,12 +95,14 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap { /** * It is called to load the data map to memory or to initialize it. */ - public void init(DataMapModel dataMapModel) throws MemoryException, IOException { + public void init(DataMapModel dataMapModel) throws IOException { // get this path from file path Path indexPath = FileFactory.getPath(dataMapModel.getFilePath()); LOGGER.info("Lucene index read path " + indexPath.toString()); + this.taskName = indexPath.getName(); + // get file system , use hdfs file system , realized in solr project FileSystem fs = FileFactory.getFileSystem(indexPath); @@ -192,7 +201,7 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap { // temporary data, delete duplicated data // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>> - Map<String, Map<String, Map<Integer, Set<Integer>>>> mapBlocks = new HashMap<>(); + Map<String, Map<Integer, Set<Integer>>> mapBlocks = new HashMap<>(); for (ScoreDoc scoreDoc : result.scoreDocs) { // get a document @@ -201,20 +210,12 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap { // get all fields List<IndexableField> fieldsInDoc = doc.getFields(); - // get this block id Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>> - String blockId = fieldsInDoc.get(BLOCKID_ID).stringValue(); - Map<String, Map<Integer, Set<Integer>>> mapBlocklets = mapBlocks.get(blockId); - if (mapBlocklets == null) { - mapBlocklets = new HashMap<>(); - mapBlocks.put(blockId, mapBlocklets); - } - // get the blocklet id Map<BlockletId, Map<PageId, Set<RowId>>> String blockletId = fieldsInDoc.get(BLOCKLETID_ID).stringValue(); - Map<Integer, Set<Integer>> mapPageIds = mapBlocklets.get(blockletId); + Map<Integer, Set<Integer>> mapPageIds = mapBlocks.get(blockletId); if (mapPageIds == null) { mapPageIds = new HashMap<>(); - mapBlocklets.put(blockletId, mapPageIds); + mapBlocks.put(blockletId, mapPageIds); } // get the page id Map<PageId, Set<RowId>> @@ -235,37 +236,32 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap { // transform all blocks into result type blocklets // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>> - for (Map.Entry<String, Map<String, Map<Integer, Set<Integer>>>> mapBlock : + for (Map.Entry<String, Map<Integer, Set<Integer>>> mapBlocklet : mapBlocks.entrySet()) { - String blockId = mapBlock.getKey(); - Map<String, Map<Integer, Set<Integer>>> mapBlocklets = mapBlock.getValue(); - // for blocklets in this block Map<BlockletId, Map<PageId, Set<RowId>>> - for (Map.Entry<String, Map<Integer, Set<Integer>>> mapBlocklet : mapBlocklets.entrySet()) { - String blockletId = mapBlocklet.getKey(); - Map<Integer, Set<Integer>> mapPageIds = mapBlocklet.getValue(); - List<FineGrainBlocklet.Page> pages = new ArrayList<FineGrainBlocklet.Page>(); - - // for pages in this blocklet Map<PageId, Set<RowId>>> - for (Map.Entry<Integer, Set<Integer>> mapPageId : mapPageIds.entrySet()) { - // construct array rowid - int[] rowIds = new int[mapPageId.getValue().size()]; - int i = 0; - // for rowids in this page Set<RowId> - for (Integer rowid : mapPageId.getValue()) { - rowIds[i++] = rowid; - } - // construct one page - FineGrainBlocklet.Page page = new FineGrainBlocklet.Page(); - page.setPageId(mapPageId.getKey()); - page.setRowId(rowIds); - - // add this page into list pages - pages.add(page); + String blockletId = mapBlocklet.getKey(); + Map<Integer, Set<Integer>> mapPageIds = mapBlocklet.getValue(); + List<FineGrainBlocklet.Page> pages = new ArrayList<FineGrainBlocklet.Page>(); + + // for pages in this blocklet Map<PageId, Set<RowId>>> + for (Map.Entry<Integer, Set<Integer>> mapPageId : mapPageIds.entrySet()) { + // construct array rowid + int[] rowIds = new int[mapPageId.getValue().size()]; + int i = 0; + // for rowids in this page Set<RowId> + for (Integer rowid : mapPageId.getValue()) { + rowIds[i++] = rowid; } + // construct one page + FineGrainBlocklet.Page page = new FineGrainBlocklet.Page(); + page.setPageId(mapPageId.getKey()); + page.setRowId(rowIds); - // add a FineGrainBlocklet - blocklets.add(new FineGrainBlocklet(blockId, blockletId, pages)); + // add this page into list pages + pages.add(page); } + + // add a FineGrainBlocklet + blocklets.add(new FineGrainBlocklet(taskName, blockletId, pages)); } return blocklets; http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 2ff4961..38b46ea 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -466,15 +466,14 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { for (Segment segment : segments) { boolean found = false; // Clear the old pruned index files if any present - segment.getFilteredIndexFiles().clear(); + segment.getFilteredIndexShardNames().clear(); // Check the segment exist in any of the pruned blocklets. for (ExtendedBlocklet blocklet : prunedBlocklets) { if (blocklet.getSegmentId().equals(segment.getSegmentNo())) { found = true; // Set the pruned index file to the segment for further pruning. - String carbonIndexFileName = - CarbonTablePath.getCarbonIndexFileName(blocklet.getBlockId()); - segment.setFilteredIndexFile(carbonIndexFileName); + String uniqueTaskName = CarbonTablePath.getUniqueTaskName(blocklet.getTaskName()); + segment.setFilteredIndexShardName(uniqueTaskName); } } // Add to remove segments list if not present in pruned blocklets. http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala index ae21416..e428e24 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala @@ -71,12 +71,10 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory { * Get the datamap for segmentid */ override def getDataMaps(segment: Segment): java.util.List[CoarseGrainDataMap] = { - val file = FileFactory.getCarbonFile( - CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)) + val path = CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo) + val file = FileFactory.getCarbonFile(path+ "/" +dataMapSchema.getDataMapName) - val files = file.listFiles(new CarbonFileFilter { - override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") - }) + val files = file.listFiles() files.map {f => val dataMap: CoarseGrainDataMap = new CGDataMap() dataMap.init(new DataMapModel(f.getCanonicalPath)) @@ -109,12 +107,10 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory { * @return */ override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = { - val file = FileFactory.getCarbonFile( - CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)) + val path = CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo) + val file = FileFactory.getCarbonFile(path+ "/" +dataMapSchema.getDataMapName) - val files = file.listFiles(new CarbonFileFilter { - override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") - }) + val files = file.listFiles() files.map { f => val d:DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath) d @@ -153,23 +149,26 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory { class CGDataMap extends CoarseGrainDataMap { - var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))] = _ + var maxMin: ArrayBuffer[(Int, (Array[Byte], Array[Byte]))] = _ var FileReader: FileReader = _ var filePath: String = _ val compressor = new SnappyCompressor + var taskName: String = _ /** * It is called to load the data map to memory or to initialize it. */ override def init(dataMapModel: DataMapModel): Unit = { this.filePath = dataMapModel.getFilePath - val size = FileFactory.getCarbonFile(filePath).getSize + val carbonFile = FileFactory.getCarbonFile(filePath) + taskName = carbonFile.getName + val size = carbonFile.getSize FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath)) val footerLen = FileReader.readInt(filePath, size-4) val bytes = FileReader.readByteArray(filePath, size-footerLen-4, footerLen) val in = new ByteArrayInputStream(compressor.unCompressByte(bytes)) val obj = new ObjectInputStream(in) - maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]] + maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(Int, (Array[Byte], Array[Byte]))]] } /** @@ -191,15 +190,15 @@ class CGDataMap extends CoarseGrainDataMap { } val meta = findMeta(value(0).getBytes) meta.map { f=> - new Blocklet(f._1, f._2 + "") + new Blocklet(taskName, f._1 + "") }.asJava } private def findMeta(value: Array[Byte]) = { val tuples = maxMin.filter { f => - ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) <= 0 && - ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) >= 0 + ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._2._1) <= 0 && + ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._2._2) >= 0 } tuples } @@ -235,13 +234,11 @@ class CGDataMapWriter(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema) extends DataMapWriter(identifier, segment, dataWritePath) { - var currentBlockId: String = null - val cgwritepath = dataWritePath + "/" + - dataMapSchema.getDataMapName + System.nanoTime() + ".datamap" - lazy val stream: DataOutputStream = FileFactory - .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath)) + var taskName: String = _ + + val cgwritepath = dataWritePath + "/" + dataMapSchema.getDataMapName +"/" val blockletList = new ArrayBuffer[Array[Byte]]() - val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]() + val maxMin = new ArrayBuffer[(Int, (Array[Byte], Array[Byte]))]() val compressor = new SnappyCompressor /** @@ -249,8 +246,8 @@ class CGDataMapWriter(identifier: AbsoluteTableIdentifier, * * @param blockId file name of the carbondata file */ - override def onBlockStart(blockId: String, taskId: Long): Unit = { - currentBlockId = blockId + override def onBlockStart(blockId: String, taskName: String): Unit = { + this.taskName = taskName } /** @@ -278,7 +275,7 @@ class CGDataMapWriter(identifier: AbsoluteTableIdentifier, val sorted = blockletList .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0) maxMin += - ((currentBlockId+"", blockletId, (sorted.last, sorted.head))) + ((blockletId, (sorted.last, sorted.head))) blockletList.clear() } @@ -315,6 +312,9 @@ class CGDataMapWriter(identifier: AbsoluteTableIdentifier, * class. */ override def finish(): Unit = { + FileFactory.mkdirs(cgwritepath, FileFactory.getFileType(cgwritepath)) + var stream: DataOutputStream = FileFactory + .getDataOutputStream(cgwritepath + "/"+taskName, FileFactory.getFileType(cgwritepath)) val out = new ByteOutputStream() val outStream = new ObjectOutputStream(out) outStream.writeObject(maxMin) @@ -323,7 +323,7 @@ class CGDataMapWriter(identifier: AbsoluteTableIdentifier, stream.write(bytes) stream.writeInt(bytes.length) stream.close() - commitFile(cgwritepath) + commitFile(cgwritepath + "/"+taskName) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala index 329c888..54a8dc2 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -210,7 +210,7 @@ object DataMapWriterSuite { * * @param blockId file name of the carbondata file */ - override def onBlockStart(blockId: String, taskId: Long) = { + override def onBlockStart(blockId: String, taskId: String) = { callbackSeq :+= s"block start $blockId" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala index d33191c..f7886a2 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala @@ -73,12 +73,10 @@ class FGDataMapFactory extends FineGrainDataMapFactory { * Get the datamap for segmentid */ override def getDataMaps(segment: Segment): java.util.List[FineGrainDataMap] = { - val file = FileFactory - .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)) + val path = CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo) + val file = FileFactory.getCarbonFile(path+ "/" +dataMapSchema.getDataMapName) - val files = file.listFiles(new CarbonFileFilter { - override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") - }) + val files = file.listFiles() files.map { f => val dataMap: FineGrainDataMap = new FGDataMap() dataMap.init(new DataMapModel(f.getCanonicalPath)) @@ -102,12 +100,10 @@ class FGDataMapFactory extends FineGrainDataMapFactory { * @return */ override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = { - val file = FileFactory.getCarbonFile( - CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)) + val path = CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo) + val file = FileFactory.getCarbonFile(path+ "/" +dataMapSchema.getDataMapName) - val files = file.listFiles(new CarbonFileFilter { - override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") - }) + val files = file.listFiles() files.map { f => val d: DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath) d @@ -152,24 +148,27 @@ class FGDataMapFactory extends FineGrainDataMapFactory { class FGDataMap extends FineGrainDataMap { - var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)] = _ + var maxMin: ArrayBuffer[(Int, (Array[Byte], Array[Byte]), Long, Int)] = _ var FileReader: FileReader = _ var filePath: String = _ val compressor = new SnappyCompressor + var taskName:String = _ /** * It is called to load the data map to memory or to initialize it. */ override def init(dataMapModel: DataMapModel): Unit = { this.filePath = dataMapModel.getFilePath - val size = FileFactory.getCarbonFile(filePath).getSize + val carbonFile = FileFactory.getCarbonFile(filePath) + taskName = carbonFile.getName + val size = carbonFile.getSize FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath)) val footerLen = FileReader.readInt(filePath, size - 4) val bytes = FileReader.readByteArray(filePath, size - footerLen - 4, footerLen) val in = new ByteArrayInputStream(compressor.unCompressByte(bytes)) val obj = new ObjectInputStream(in) maxMin = obj.readObject() - .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]] + .asInstanceOf[ArrayBuffer[(Int, (Array[Byte], Array[Byte]), Long, Int)]] } /** @@ -195,9 +194,9 @@ class FGDataMap extends FineGrainDataMap { }.filter(_.isDefined).map(_.get).asJava } - private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int), + private def readAndFindData(meta: (Int, (Array[Byte], Array[Byte]), Long, Int), value: Array[Byte]): Option[FineGrainBlocklet] = { - val bytes = FileReader.readByteArray(filePath, meta._4, meta._5) + val bytes = FileReader.readByteArray(filePath, meta._3, meta._4) val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes)) val obj = new ObjectInputStream(outputStream) val blockletsData = obj.readObject() @@ -220,7 +219,7 @@ class FGDataMap extends FineGrainDataMap { pg.setRowId(f._2(p._2).toArray) pg } - Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava)) + Some(new FineGrainBlocklet(taskName, meta._1.toString, pages.toList.asJava)) } else { None } @@ -228,8 +227,8 @@ class FGDataMap extends FineGrainDataMap { private def findMeta(value: Array[Byte]) = { val tuples = maxMin.filter { f => - ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 && - ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0 + ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._2._1) >= 0 && + ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._2._2) <= 0 } tuples } @@ -263,13 +262,11 @@ class FGDataMapWriter(identifier: AbsoluteTableIdentifier, segment: Segment, dataWriterPath: String, dataMapSchema: DataMapSchema) extends DataMapWriter(identifier, segment, dataWriterPath) { - var currentBlockId: String = null - val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() + - ".datamap" - val stream: DataOutputStream = FileFactory - .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath)) + var taskName: String = _ + val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName +"/" + var stream: DataOutputStream = _ val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]() - val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]() + val maxMin = new ArrayBuffer[(Int, (Array[Byte], Array[Byte]), Long, Int)]() var position: Long = 0 val compressor = new SnappyCompressor @@ -278,8 +275,13 @@ class FGDataMapWriter(identifier: AbsoluteTableIdentifier, * * @param blockId file name of the carbondata file */ - override def onBlockStart(blockId: String, taskId: Long): Unit = { - currentBlockId = blockId + override def onBlockStart(blockId: String, taskId: String): Unit = { + this.taskName = taskId + if (stream == null) { + FileFactory.mkdirs(fgwritepath, FileFactory.getFileType(fgwritepath)) + stream = FileFactory + .getDataOutputStream(fgwritepath + "/"+taskName, FileFactory.getFileType(fgwritepath)) + } } /** @@ -336,7 +338,7 @@ class FGDataMapWriter(identifier: AbsoluteTableIdentifier, val bytes = compressor.compressByte(out.getBytes) stream.write(bytes) maxMin += - ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, blockletListUpdated.last + ((blockletId, (blockletListUpdated.head._1, blockletListUpdated.last ._1), position, bytes.length)) position += bytes.length blockletList.clear() @@ -394,6 +396,7 @@ class FGDataMapWriter(identifier: AbsoluteTableIdentifier, * class. */ override def finish(): Unit = { + FileFactory.mkdirs(fgwritepath, FileFactory.getFileType(fgwritepath)) val out = new ByteOutputStream() val outStream = new ObjectOutputStream(out) outStream.writeObject(maxMin) @@ -402,7 +405,7 @@ class FGDataMapWriter(identifier: AbsoluteTableIdentifier, stream.write(bytes) stream.writeInt(bytes.length) stream.close() - commitFile(fgwritepath) + commitFile(fgwritepath + "/"+taskName) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala index e19c4b7..d48ac6b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala @@ -203,7 +203,7 @@ class TestDataMap() extends CoarseGrainDataMapFactory { override def onBlockletStart(blockletId: Int): Unit = { } - override def onBlockStart(blockId: String, taskId: Long): Unit = { + override def onBlockStart(blockId: String, taskId: String): Unit = { // trigger the second SQL to execute } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala index 55e9bac..2b7bd46 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala @@ -307,7 +307,7 @@ class WaitingDataMap() extends CoarseGrainDataMapFactory { override def onBlockletStart(blockletId: Int): Unit = { } - override def onBlockStart(blockId: String, taskId: Long): Unit = { + override def onBlockStart(blockId: String, taskId: String): Unit = { // trigger the second SQL to execute Global.overwriteRunning = true http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index 6e7f2d6..a9e30d3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -91,10 +91,10 @@ public class DataMapWriterListener { LOG.info("DataMapWriter " + writer + " added"); } - public void onBlockStart(String blockId, String blockPath, long taskId) throws IOException { + public void onBlockStart(String blockId, String blockPath, String taskName) throws IOException { for (List<DataMapWriter> writers : registry.values()) { for (DataMapWriter writer : writers) { - writer.onBlockStart(blockId, taskId); + writer.onBlockStart(blockId, taskName); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b33ab24/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index 94ade87..e5c90e0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -250,8 +250,8 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { private void notifyDataMapBlockStart() { if (listener != null) { try { - listener.onBlockStart(carbonDataFileName, carbonDataFileHdfsPath, - model.getCarbonDataFileAttributes().getTaskId()); + String taskName = CarbonTablePath.getUniqueTaskName(carbonDataFileName); + listener.onBlockStart(carbonDataFileName, carbonDataFileHdfsPath, taskName); } catch (IOException e) { throw new CarbonDataWriterException("Problem while writing datamap", e); } @@ -266,7 +266,6 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { throw new CarbonDataWriterException("Problem while writing datamap", e); } } - blockletId = 0; } /**
