Repository: carbondata Updated Branches: refs/heads/master e3f98fa43 -> d3a09e279
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java index 64ff202..ed54a3b 100644 --- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java @@ -25,11 +25,11 @@ import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; -import org.apache.carbondata.core.metadata.ColumnIdentifier; import org.apache.carbondata.core.reader.CarbonDictionaryColumnMetaChunk; import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReader; import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReaderImpl; @@ -77,7 +77,7 @@ public class CarbonDictionaryWriterImpl implements CarbonDictionaryWriter { /** * column identifier */ - protected ColumnIdentifier columnIdentifier; + protected DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier; /** * carbon dictionary data store path @@ -132,12 +132,12 @@ public class CarbonDictionaryWriterImpl implements CarbonDictionaryWriter { * * @param storePath carbon dictionary data store path * @param carbonTableIdentifier table identifier which will give table name and database name - * @param columnIdentifier column unique identifier + * @param dictionaryColumnUniqueIdentifier column unique identifier */ - public CarbonDictionaryWriterImpl(String storePath, - CarbonTableIdentifier carbonTableIdentifier, ColumnIdentifier columnIdentifier) { + public CarbonDictionaryWriterImpl(String storePath, CarbonTableIdentifier carbonTableIdentifier, + DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier) { this.carbonTableIdentifier = carbonTableIdentifier; - this.columnIdentifier = columnIdentifier; + this.dictionaryColumnUniqueIdentifier = dictionaryColumnUniqueIdentifier; this.storePath = storePath; this.isFirstTime = true; } @@ -254,11 +254,13 @@ public class CarbonDictionaryWriterImpl implements CarbonDictionaryWriter { protected void initPaths() { PathService pathService = CarbonCommonFactory.getPathService(); - CarbonTablePath carbonTablePath = pathService.getCarbonTablePath( - this.storePath, carbonTableIdentifier); - this.dictionaryFilePath = carbonTablePath.getDictionaryFilePath(columnIdentifier.getColumnId()); - this.dictionaryMetaFilePath = - carbonTablePath.getDictionaryMetaFilePath(columnIdentifier.getColumnId()); + CarbonTablePath carbonTablePath = pathService + .getCarbonTablePath(this.storePath, carbonTableIdentifier, + dictionaryColumnUniqueIdentifier); + this.dictionaryFilePath = carbonTablePath.getDictionaryFilePath( + dictionaryColumnUniqueIdentifier.getColumnIdentifier().getColumnId()); + this.dictionaryMetaFilePath = carbonTablePath.getDictionaryMetaFilePath( + dictionaryColumnUniqueIdentifier.getColumnIdentifier().getColumnId()); } /** @@ -290,17 +292,19 @@ public class CarbonDictionaryWriterImpl implements CarbonDictionaryWriter { int bytesToTruncate = 0; if (null != chunkMetaObjectForLastSegmentEntry) { bytesToTruncate = - (int) (chunk_start_offset - chunkMetaObjectForLastSegmentEntry.getEnd_offset()); + (int) (chunk_start_offset - chunkMetaObjectForLastSegmentEntry.getEnd_offset()); } if (bytesToTruncate > 0) { - LOGGER.info("some inconsistency in dictionary file for column " + this.columnIdentifier); + LOGGER.info("some inconsistency in dictionary file for column " + + this.dictionaryColumnUniqueIdentifier.getColumnIdentifier()); // truncate the dictionary data till chunk meta end offset FileFactory.FileType fileType = FileFactory.getFileType(this.dictionaryFilePath); CarbonFile carbonFile = FileFactory.getCarbonFile(this.dictionaryFilePath, fileType); boolean truncateSuccess = carbonFile .truncate(this.dictionaryFilePath, chunkMetaObjectForLastSegmentEntry.getEnd_offset()); if (!truncateSuccess) { - LOGGER.info("Diction file not truncated successfully for column " + this.columnIdentifier); + LOGGER.info("Diction file not truncated successfully for column " + + this.dictionaryColumnUniqueIdentifier.getColumnIdentifier()); } } } @@ -344,7 +348,8 @@ public class CarbonDictionaryWriterImpl implements CarbonDictionaryWriter { // write dictionary metadata file writeThriftObject(dictionaryChunkMeta); LOGGER.info("Dictionary metadata file written successfully for column " - + this.columnIdentifier + " at path " + this.dictionaryMetaFilePath); + + this.dictionaryColumnUniqueIdentifier.getColumnIdentifier() + " at path " + + this.dictionaryMetaFilePath); } finally { closeThriftWriter(); } @@ -426,7 +431,7 @@ public class CarbonDictionaryWriterImpl implements CarbonDictionaryWriter { */ protected CarbonDictionaryMetadataReader getDictionaryMetadataReader() { return new CarbonDictionaryMetadataReaderImpl(storePath, carbonTableIdentifier, - columnIdentifier); + dictionaryColumnUniqueIdentifier); } @Override public void commit() throws IOException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java index d4750da..b10da11 100644 --- a/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImpl.java @@ -23,11 +23,11 @@ import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; -import org.apache.carbondata.core.metadata.ColumnIdentifier; import org.apache.carbondata.core.service.CarbonCommonFactory; import org.apache.carbondata.core.service.PathService; import org.apache.carbondata.core.util.CarbonProperties; @@ -50,7 +50,7 @@ public class CarbonDictionarySortIndexWriterImpl implements CarbonDictionarySort /** * column name */ - protected ColumnIdentifier columnIdentifier; + protected DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier; /** * carbon store location @@ -79,12 +79,13 @@ public class CarbonDictionarySortIndexWriterImpl implements CarbonDictionarySort /** * @param carbonStorePath Carbon store path * @param carbonTableIdentifier table identifier which will give table name and database name - * @param columnIdentifier column unique identifier + * @param dictionaryColumnUniqueIdentifier column unique identifier */ public CarbonDictionarySortIndexWriterImpl(final CarbonTableIdentifier carbonTableIdentifier, - final ColumnIdentifier columnIdentifier, final String carbonStorePath) { + final DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier, + final String carbonStorePath) { this.carbonTableIdentifier = carbonTableIdentifier; - this.columnIdentifier = columnIdentifier; + this.dictionaryColumnUniqueIdentifier = dictionaryColumnUniqueIdentifier; this.carbonStorePath = carbonStorePath; } @@ -150,11 +151,14 @@ public class CarbonDictionarySortIndexWriterImpl implements CarbonDictionarySort protected void initPath() { PathService pathService = CarbonCommonFactory.getPathService(); CarbonTablePath carbonTablePath = pathService - .getCarbonTablePath(carbonStorePath, carbonTableIdentifier); - String dictionaryPath = carbonTablePath.getDictionaryFilePath(columnIdentifier.getColumnId()); + .getCarbonTablePath(carbonStorePath, carbonTableIdentifier, + dictionaryColumnUniqueIdentifier); + String dictionaryPath = carbonTablePath.getDictionaryFilePath( + dictionaryColumnUniqueIdentifier.getColumnIdentifier().getColumnId()); long dictOffset = CarbonUtil.getFileSize(dictionaryPath); - this.sortIndexFilePath = - carbonTablePath.getSortIndexFilePath(columnIdentifier.getColumnId(), dictOffset); + this.sortIndexFilePath = carbonTablePath + .getSortIndexFilePath(dictionaryColumnUniqueIdentifier.getColumnIdentifier().getColumnId(), + dictOffset); cleanUpOldSortIndex(carbonTablePath, dictionaryPath); } @@ -166,9 +170,8 @@ public class CarbonDictionarySortIndexWriterImpl implements CarbonDictionarySort protected void cleanUpOldSortIndex(CarbonTablePath carbonTablePath, String dictPath) { CarbonFile dictFile = FileFactory.getCarbonFile(dictPath, FileFactory.getFileType(dictPath)); - CarbonFile[] files = - carbonTablePath.getSortIndexFiles(dictFile.getParentFile(), - columnIdentifier.getColumnId()); + CarbonFile[] files = carbonTablePath.getSortIndexFiles(dictFile.getParentFile(), + dictionaryColumnUniqueIdentifier.getColumnIdentifier().getColumnId()); int maxTime; try { maxTime = Integer.parseInt(CarbonProperties.getInstance() http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/core/src/test/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCacheTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCacheTest.java b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCacheTest.java index 2fa2b93..957ea22 100644 --- a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCacheTest.java +++ b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCacheTest.java @@ -105,7 +105,8 @@ public class AbstractDictionaryCacheTest { String columnId) { ColumnIdentifier columnIdentifier = new ColumnIdentifier(columnId, null, DataType.STRING); return new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, columnIdentifier, - DataType.STRING); + DataType.STRING, + CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier)); } /** @@ -126,8 +127,12 @@ public class AbstractDictionaryCacheTest { protected void prepareWriterAndWriteData(List<String> data, String columnId) throws IOException { ColumnIdentifier columnIdentifier = new ColumnIdentifier(columnId, null, null); + DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier = + new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, columnIdentifier, + columnIdentifier.getDataType(), + CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier)); CarbonDictionaryWriter carbonDictionaryWriter = - new CarbonDictionaryWriterImpl(carbonStorePath, carbonTableIdentifier, columnIdentifier); + new CarbonDictionaryWriterImpl(carbonStorePath, carbonTableIdentifier, dictionaryColumnUniqueIdentifier); CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier); CarbonUtil.checkAndCreateFolder(carbonTablePath.getMetadataDirectoryPath()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryCacheLoaderImplTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryCacheLoaderImplTest.java b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryCacheLoaderImplTest.java index 0e17600..a751120 100644 --- a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryCacheLoaderImplTest.java +++ b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryCacheLoaderImplTest.java @@ -29,6 +29,7 @@ import org.apache.carbondata.core.metadata.ColumnIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.reader.CarbonDictionaryReaderImpl; import org.apache.carbondata.core.reader.sortindex.CarbonDictionarySortIndexReaderImpl; +import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.format.ColumnDictionaryChunk; import mockit.Mock; @@ -43,10 +44,19 @@ public class DictionaryCacheLoaderImplTest { private static DictionaryCacheLoaderImpl dictionaryCacheLoader; private static DictionaryInfo dictionaryInfo; private static ColumnIdentifier columnIdentifier; + private static DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier; @BeforeClass public static void setUp() { CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("db", "table1", "1"); - dictionaryCacheLoader = new DictionaryCacheLoaderImpl(carbonTableIdentifier, "/tmp/"); + Map<String, String> columnProperties = new HashMap<>(); + columnProperties.put("prop1", "value1"); + columnProperties.put("prop2", "value2"); + columnIdentifier = new ColumnIdentifier("1", columnProperties, DataType.STRING); + dictionaryColumnUniqueIdentifier = + new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, columnIdentifier, + columnIdentifier.getDataType(), CarbonStorePath.getCarbonTablePath("/tmp", carbonTableIdentifier)); + dictionaryCacheLoader = new DictionaryCacheLoaderImpl(carbonTableIdentifier, "/tmp/", + dictionaryColumnUniqueIdentifier); dictionaryInfo = new ColumnDictionaryInfo(DataType.STRING); new MockUp<CarbonDictionaryReaderImpl>() { @Mock @SuppressWarnings("unused") Iterator<byte[]> read(long startOffset, long endOffset) @@ -68,10 +78,6 @@ public class DictionaryCacheLoaderImplTest { return Arrays.asList(1, 2); } }; - Map<String, String> columnProperties = new HashMap<>(); - columnProperties.put("prop1", "value1"); - columnProperties.put("prop2", "value2"); - columnIdentifier = new ColumnIdentifier("1", columnProperties, DataType.STRING); } @Test public void testToLoad() throws IOException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryColumnUniqueIdentifierTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryColumnUniqueIdentifierTest.java b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryColumnUniqueIdentifierTest.java index 028ae81..0e2eed9 100644 --- a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryColumnUniqueIdentifierTest.java +++ b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryColumnUniqueIdentifierTest.java @@ -47,13 +47,13 @@ public class DictionaryColumnUniqueIdentifierTest { ColumnIdentifier columnIdentifier2 = new ColumnIdentifier("1", properties, DataType.INT); dictionaryColumnUniqueIdentifier1 = new DictionaryColumnUniqueIdentifier(carbonTableIdentifier1, columnIdentifier, - DataType.MAP); + DataType.MAP, null); dictionaryColumnUniqueIdentifier2 = new DictionaryColumnUniqueIdentifier(carbonTableIdentifier2, columnIdentifier2, - DataType.MAP); + DataType.MAP, null); dictionaryColumnUniqueIdentifier3 = new DictionaryColumnUniqueIdentifier(carbonTableIdentifier2, columnIdentifier, - DataType.MAP); + DataType.MAP, null); } @Test public void testToGetDataType() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java index 4efb093..34aed8a 100644 --- a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java +++ b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java @@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.ColumnIdentifier; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter; import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl; import org.junit.After; @@ -209,6 +210,10 @@ public class ForwardDictionaryCacheTest extends AbstractDictionaryCacheTest { */ private void writeSortIndexFile(List<String> data, String columnId) throws IOException { ColumnIdentifier columnIdentifier = new ColumnIdentifier(columnId, null, null); + DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier = + new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, columnIdentifier, + columnIdentifier.getDataType(), + CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier)); Map<String, Integer> dataToSurrogateKeyMap = new HashMap<>(data.size()); int surrogateKey = 0; List<Integer> invertedIndexList = new ArrayList<>(data.size()); @@ -228,7 +233,7 @@ public class ForwardDictionaryCacheTest extends AbstractDictionaryCacheTest { invertedIndexList.add(invertedIndexArray[i]); } CarbonDictionarySortIndexWriter dictionarySortIndexWriter = - new CarbonDictionarySortIndexWriterImpl(carbonTableIdentifier, columnIdentifier, + new CarbonDictionarySortIndexWriterImpl(carbonTableIdentifier, dictionaryColumnUniqueIdentifier, carbonStorePath); try { dictionarySortIndexWriter.writeSortIndex(sortedIndexList); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/core/src/test/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImplTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImplTest.java b/core/src/test/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImplTest.java index dcd1780..b3fbdd6 100644 --- a/core/src/test/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImplTest.java +++ b/core/src/test/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImplTest.java @@ -23,11 +23,13 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.service.impl.PathFactory; import org.apache.carbondata.core.service.CarbonCommonFactory; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.ColumnIdentifier; import org.apache.carbondata.core.service.PathService; +import org.apache.carbondata.core.util.path.CarbonStorePath; import mockit.Mock; import mockit.MockUp; @@ -47,8 +49,12 @@ public class CarbonDictionaryReaderImplTest { columnIdentifier = new ColumnIdentifier("1", null, null); carbonTableIdentifier = new CarbonTableIdentifier("dbName", "tableName", UUID.randomUUID().toString()); + DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier = + new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, columnIdentifier, + columnIdentifier.getDataType(), + CarbonStorePath.getCarbonTablePath("storePath", carbonTableIdentifier)); carbonDictionaryReaderImpl = - new CarbonDictionaryReaderImpl("storePath", carbonTableIdentifier, columnIdentifier); + new CarbonDictionaryReaderImpl("storePath", carbonTableIdentifier, dictionaryColumnUniqueIdentifier); } @Test public void testRead() throws Exception { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/core/src/test/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImplTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImplTest.java b/core/src/test/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImplTest.java index 2a2551b..2a4c290 100644 --- a/core/src/test/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImplTest.java +++ b/core/src/test/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImplTest.java @@ -22,11 +22,13 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.ColumnIdentifier; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.writer.CarbonDictionaryWriter; import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl; import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter; @@ -62,12 +64,14 @@ public class CarbonDictionarySortIndexReaderImplTest { CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("testSchema", "carbon", UUID.randomUUID().toString()); ColumnIdentifier columnIdentifier = new ColumnIdentifier("Name", null, null); + DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, columnIdentifier, columnIdentifier.getDataType(), + CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)); CarbonDictionaryWriter dictionaryWriter = new CarbonDictionaryWriterImpl(storePath, - carbonTableIdentifier, columnIdentifier); + carbonTableIdentifier, dictionaryColumnUniqueIdentifier); String metaFolderPath =storePath+File.separator+carbonTableIdentifier.getDatabaseName()+File.separator+carbonTableIdentifier.getTableName()+File.separator+"Metadata"; CarbonUtil.checkAndCreateFolder(metaFolderPath); CarbonDictionarySortIndexWriter dictionarySortIndexWriter = - new CarbonDictionarySortIndexWriterImpl(carbonTableIdentifier, columnIdentifier, storePath); + new CarbonDictionarySortIndexWriterImpl(carbonTableIdentifier, dictionaryColumnUniqueIdentifier, storePath); List<int[]> expectedData = prepareExpectedData(); int[] data = expectedData.get(0); for(int i=0;i<data.length;i++) { @@ -81,7 +85,7 @@ public class CarbonDictionarySortIndexReaderImplTest { dictionarySortIndexWriter.writeInvertedSortIndex(invertedSortIndex); dictionarySortIndexWriter.close(); CarbonDictionarySortIndexReader dictionarySortIndexReader = - new CarbonDictionarySortIndexReaderImpl(carbonTableIdentifier, columnIdentifier, storePath); + new CarbonDictionarySortIndexReaderImpl(carbonTableIdentifier, dictionaryColumnUniqueIdentifier, storePath); List<Integer> actualSortIndex = dictionarySortIndexReader.readSortIndex(); List<Integer> actualInvertedSortIndex = dictionarySortIndexReader.readInvertedSortIndex(); for (int i = 0; i < actualSortIndex.size(); i++) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java b/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java index 55abac3..8bb2052 100644 --- a/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java +++ b/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Properties; import java.util.UUID; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.ColumnIdentifier; import org.apache.carbondata.core.util.path.CarbonStorePath; @@ -72,6 +73,8 @@ public class CarbonDictionaryWriterImplTest { private ColumnIdentifier columnIdentifier; + private DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier; + private Properties props; /** @@ -97,6 +100,10 @@ public class CarbonDictionaryWriterImplTest { this.carbonStorePath = props.getProperty("storePath", "carbonStore"); this.columnIdentifier = new ColumnIdentifier("Name", null, null); carbonTableIdentifier = new CarbonTableIdentifier(databaseName, tableName, UUID.randomUUID().toString()); + this.dictionaryColumnUniqueIdentifier = + new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, columnIdentifier, + columnIdentifier.getDataType(), + CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier)); deleteStorePath(); prepareDataSet(); } @@ -177,7 +184,7 @@ public class CarbonDictionaryWriterImplTest { private CarbonDictionaryWriterImpl prepareWriter() throws IOException { initDictionaryDirPaths(); return new CarbonDictionaryWriterImpl(this.carbonStorePath, carbonTableIdentifier, - columnIdentifier); + dictionaryColumnUniqueIdentifier); } /** @@ -432,7 +439,7 @@ public class CarbonDictionaryWriterImplTest { private List<CarbonDictionaryColumnMetaChunk> readDictionaryMetadataFile() throws IOException { CarbonDictionaryMetadataReaderImpl columnMetadataReaderImpl = new CarbonDictionaryMetadataReaderImpl(this.carbonStorePath, this.carbonTableIdentifier, - this.columnIdentifier); + this.dictionaryColumnUniqueIdentifier); List<CarbonDictionaryColumnMetaChunk> dictionaryMetaChunkList = null; // read metadata file try { @@ -451,7 +458,7 @@ public class CarbonDictionaryWriterImplTest { throws IOException { CarbonDictionaryReaderImpl dictionaryReader = new CarbonDictionaryReaderImpl(this.carbonStorePath, this.carbonTableIdentifier, - this.columnIdentifier); + this.dictionaryColumnUniqueIdentifier); List<byte[]> dictionaryValues = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); try { if (0 == dictionaryEndOffset) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java b/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java index 24083b7..d04d8a2 100644 --- a/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java +++ b/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java @@ -22,11 +22,13 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.ColumnIdentifier; import org.apache.carbondata.core.reader.sortindex.CarbonDictionarySortIndexReader; import org.apache.carbondata.core.reader.sortindex.CarbonDictionarySortIndexReaderImpl; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.writer.CarbonDictionaryWriter; import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl; @@ -53,12 +55,14 @@ public class CarbonDictionarySortIndexWriterImplTest { carbonTableIdentifier = new CarbonTableIdentifier("testSchema", "carbon", UUID.randomUUID().toString()); columnIdentifier = new ColumnIdentifier("Name", null, null); + DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, columnIdentifier, columnIdentifier.getDataType(), + CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)); dictionaryWriter = - new CarbonDictionaryWriterImpl(storePath, carbonTableIdentifier, columnIdentifier); + new CarbonDictionaryWriterImpl(storePath, carbonTableIdentifier, dictionaryColumnUniqueIdentifier); dictionarySortIndexWriter = - new CarbonDictionarySortIndexWriterImpl(carbonTableIdentifier, columnIdentifier, storePath); + new CarbonDictionarySortIndexWriterImpl(carbonTableIdentifier, dictionaryColumnUniqueIdentifier, storePath); carbonDictionarySortIndexReader = - new CarbonDictionarySortIndexReaderImpl(carbonTableIdentifier, columnIdentifier, storePath); + new CarbonDictionarySortIndexReaderImpl(carbonTableIdentifier, dictionaryColumnUniqueIdentifier, storePath); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala index 9aed2ea..8b2ceba 100644 --- a/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala +++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala @@ -77,7 +77,9 @@ object GenerateDictionaryExample { println(s"dictionary of dimension: ${dimension.getColName}") println(s"Key\t\t\tValue") val columnIdentifier = new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, - dimension.getColumnIdentifier, dimension.getDataType) + dimension.getColumnIdentifier, dimension.getDataType, + CarbonStorePath + .getCarbonTablePath(carbonTable.getStorePath, carbonTable.getCarbonTableIdentifier)) val dict = CarbonLoaderUtil.getDictionary(columnIdentifier, cc.storePath) var index: Int = 1 var distinctValue = dict.getDictionaryValueForKey(index) http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java index 249543e..fb3a637 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java @@ -54,6 +54,8 @@ import org.apache.carbondata.core.mutate.data.BlockMappingVO; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.SingleTableProvider; +import org.apache.carbondata.core.scan.filter.TableProvider; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.scan.model.CarbonQueryPlan; import org.apache.carbondata.core.scan.model.QueryModel; @@ -348,6 +350,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { // process and resolve the expression Expression filter = getFilterPredicates(job.getConfiguration()); CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); + TableProvider tableProvider = new SingleTableProvider(carbonTable); CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); BitSet matchedPartitions = null; PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName()); @@ -365,7 +368,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { } } - FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier); + FilterResolverIntf filterInterface = CarbonInputFormatUtil + .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider); // do block filtering and get split List<InputSplit> splits = getSplits(job, filterInterface, matchedPartitions, cacheClient, @@ -794,6 +798,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { throws IOException { Configuration configuration = taskAttemptContext.getConfiguration(); CarbonTable carbonTable = getOrCreateCarbonTable(configuration); + TableProvider tableProvider = new SingleTableProvider(carbonTable); // getting the table absoluteTableIdentifier from the carbonTable // to avoid unnecessary deserialization AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); @@ -807,7 +812,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { // set the filter to the query model in order to filter blocklet before scan Expression filter = getFilterPredicates(configuration); CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); - FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier); + FilterResolverIntf filterIntf = CarbonInputFormatUtil + .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider); queryModel.setFilterExpressionResolverTree(filterIntf); // update the file level index store if there are invalid segment http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 6ae346f..c69e19f 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -47,6 +47,8 @@ import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.mutate.data.BlockMappingVO; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; +import org.apache.carbondata.core.scan.filter.SingleTableProvider; +import org.apache.carbondata.core.scan.filter.TableProvider; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.scan.model.CarbonQueryPlan; import org.apache.carbondata.core.scan.model.QueryModel; @@ -280,6 +282,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { // process and resolve the expression Expression filter = getFilterPredicates(job.getConfiguration()); CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); + TableProvider tableProvider = new SingleTableProvider(carbonTable); // this will be null in case of corrupt schema file. if (null == carbonTable) { throw new IOException("Missing/Corrupt schema file for table."); @@ -300,7 +303,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { } } - FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier); + FilterResolverIntf filterInterface = CarbonInputFormatUtil + .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider); // do block filtering and get split List<InputSplit> splits = @@ -346,6 +350,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); + TableProvider tableProvider = new SingleTableProvider(carbonTable); // prune partitions for filter query on partition table String partitionIds = job.getConfiguration().get(ALTER_PARTITION_ID); BitSet matchedPartitions = null; @@ -360,7 +365,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { } } - FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier); + FilterResolverIntf filterInterface = + CarbonInputFormatUtil.resolveFilter(filter, identifier, tableProvider); // do block filtering and get split List<InputSplit> splits = getSplits(job, filterInterface, segmentList, matchedPartitions, partitionInfo, oldPartitionIdList); @@ -543,6 +549,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { throws IOException { Configuration configuration = taskAttemptContext.getConfiguration(); CarbonTable carbonTable = getOrCreateCarbonTable(configuration); + TableProvider tableProvider = new SingleTableProvider(carbonTable); // getting the table absoluteTableIdentifier from the carbonTable // to avoid unnecessary deserialization AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); @@ -556,7 +563,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { // set the filter to the query model in order to filter blocklet before scan Expression filter = getFilterPredicates(configuration); CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); - FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier); + FilterResolverIntf filterIntf = CarbonInputFormatUtil + .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider); queryModel.setFilterExpressionResolverTree(filterIntf); // update the file level index store if there are invalid segment http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java index 0239bce..32d879f 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java @@ -28,6 +28,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; /** @@ -64,7 +65,8 @@ public class DictionaryDecodeReadSupport<T> implements CarbonReadSupport<T> { dataTypes[i] = carbonColumns[i].getDataType(); dictionaries[i] = forwardDictionaryCache.get(new DictionaryColumnUniqueIdentifier( absoluteTableIdentifier.getCarbonTableIdentifier(), - carbonColumns[i].getColumnIdentifier(), dataTypes[i])); + carbonColumns[i].getColumnIdentifier(), dataTypes[i], + CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier))); } else { dataTypes[i] = carbonColumns[i].getDataType(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java index 0dc79fa..a559cc4 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java @@ -26,6 +26,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; +import org.apache.carbondata.core.scan.filter.TableProvider; import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer; import org.apache.carbondata.core.scan.filter.intf.FilterOptimizerBasic; import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer; @@ -131,11 +132,12 @@ public class CarbonInputFormatUtil { * @return */ public static FilterResolverIntf resolveFilter(Expression filterExpression, - AbsoluteTableIdentifier absoluteTableIdentifier) { + AbsoluteTableIdentifier absoluteTableIdentifier, TableProvider tableProvider) { try { FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor(); //get resolved filter - return filterExpressionProcessor.getFilterResolver(filterExpression, absoluteTableIdentifier); + return filterExpressionProcessor + .getFilterResolver(filterExpression, absoluteTableIdentifier, tableProvider); } catch (Exception e) { throw new RuntimeException("Error while resolving filter expression", e); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java index cf44c6f..beca50d 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java @@ -325,9 +325,13 @@ public class StoreCreator { .createCache(CacheType.REVERSE_DICTIONARY, absoluteTableIdentifier.getStorePath()); for (int i = 0; i < set.length; i++) { ColumnIdentifier columnIdentifier = new ColumnIdentifier(dims.get(i).getColumnId(), null, null); + DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier = + new DictionaryColumnUniqueIdentifier(table.getCarbonTableIdentifier(), columnIdentifier, + columnIdentifier.getDataType(), CarbonStorePath + .getCarbonTablePath(table.getStorePath(), table.getCarbonTableIdentifier())); CarbonDictionaryWriter writer = new CarbonDictionaryWriterImpl(absoluteTableIdentifier.getStorePath(), - absoluteTableIdentifier.getCarbonTableIdentifier(), columnIdentifier); + absoluteTableIdentifier.getCarbonTableIdentifier(), dictionaryColumnUniqueIdentifier); for (String value : set[i]) { writer.write(value); } @@ -335,7 +339,8 @@ public class StoreCreator { writer.commit(); Dictionary dict = (Dictionary) dictCache.get( new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier(), - columnIdentifier, dims.get(i).getDataType())); + columnIdentifier, dims.get(i).getDataType(),CarbonStorePath + .getCarbonTablePath(table.getStorePath(), table.getCarbonTableIdentifier()))); CarbonDictionarySortInfoPreparator preparator = new CarbonDictionarySortInfoPreparator(); List<String> newDistinctValues = new ArrayList<String>(); @@ -343,7 +348,7 @@ public class StoreCreator { preparator.getDictionarySortInfo(newDistinctValues, dict, dims.get(i).getDataType()); CarbonDictionarySortIndexWriter carbonDictionaryWriter = new CarbonDictionarySortIndexWriterImpl( - absoluteTableIdentifier.getCarbonTableIdentifier(), columnIdentifier, + absoluteTableIdentifier.getCarbonTableIdentifier(), dictionaryColumnUniqueIdentifier, absoluteTableIdentifier.getStorePath()); try { carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java index f08b92b..5eae253 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java @@ -33,6 +33,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; @@ -88,7 +89,8 @@ public class CarbonDictionaryDecodeReadSupport<T> implements CarbonReadSupport<T dataTypes[i] = carbonColumns[i].getDataType(); dictionaries[i] = forwardDictionaryCache.get( new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier(), - carbonColumns[i].getColumnIdentifier(), dataTypes[i])); + carbonColumns[i].getColumnIdentifier(), dataTypes[i], + CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier))); } else { dataTypes[i] = carbonColumns[i].getDataType(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java index 86ebc0d..8f3fdce 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java @@ -24,6 +24,8 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.SingleTableProvider; +import org.apache.carbondata.core.scan.filter.TableProvider; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.scan.model.CarbonQueryPlan; import org.apache.carbondata.core.scan.model.QueryModel; @@ -120,6 +122,7 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable> private QueryModel getQueryModel(Configuration configuration, String path) throws IOException { CarbonTable carbonTable = getCarbonTable(configuration, path); + TableProvider tableProvider = new SingleTableProvider(carbonTable); // getting the table absoluteTableIdentifier from the carbonTable // to avoid unnecessary deserialization @@ -133,7 +136,8 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable> // set the filter to the query model in order to filter blocklet before scan Expression filter = getFilterPredicates(configuration); CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); - FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier); + FilterResolverIntf filterIntf = + CarbonInputFormatUtil.resolveFilter(filter, identifier, tableProvider); queryModel.setFilterExpressionResolverTree(filterIntf); return queryModel; http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java index 3385170..8aacf88 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java @@ -40,6 +40,8 @@ import org.apache.carbondata.core.scan.expression.LiteralExpression; import org.apache.carbondata.core.scan.expression.conditional.*; import org.apache.carbondata.core.scan.expression.logical.AndExpression; import org.apache.carbondata.core.scan.expression.logical.OrExpression; +import org.apache.carbondata.core.scan.filter.SingleTableProvider; +import org.apache.carbondata.core.scan.filter.TableProvider; import org.apache.carbondata.core.scan.model.CarbonQueryPlan; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; @@ -241,10 +243,11 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider { } else if (tmp.size() == 1) finalFilters = tmp.get(0); else return; + TableProvider tableProvider = new SingleTableProvider(carbonTable); // todo set into QueryModel CarbonInputFormatUtil.processFilterExpression(finalFilters, carbonTable); - queryModel.setFilterExpressionResolverTree( - CarbonInputFormatUtil.resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier())); + queryModel.setFilterExpressionResolverTree(CarbonInputFormatUtil + .resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier(), tableProvider)); } public static DataType spi2CarbondataTypeMapper(CarbondataColumnHandle carbondataColumnHandle) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java index 6ddb8be..d78f786 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java @@ -61,6 +61,8 @@ import org.apache.carbondata.core.reader.ThriftReader; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.SingleTableProvider; +import org.apache.carbondata.core.scan.filter.TableProvider; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.service.impl.PathFactory; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; @@ -298,8 +300,8 @@ public class CarbonTableReader { new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(), UUID.randomUUID().toString()); // get the store path of the table. - cache.carbonTablePath = - PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier); + cache.carbonTablePath = PathFactory.getInstance() + .getCarbonTablePath(storePath, cache.carbonTableIdentifier, null); // cache the table cc.put(table, cache); @@ -385,10 +387,13 @@ public class CarbonTableReader { cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds); } + TableProvider tableProvider = new SingleTableProvider(tableCacheModel.carbonTable); + // get filter for segment CarbonInputFormatUtil.processFilterExpression(filters, tableCacheModel.carbonTable); FilterResolverIntf filterInterface = CarbonInputFormatUtil - .resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier()); + .resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier(), + tableProvider); IUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0); List<CarbonLocalInputSplit> result = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala index fbdfebd..a3244ae 100644 --- a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala +++ b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeSupport.scala @@ -22,7 +22,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.datatype.DataType import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn - +import org.apache.carbondata.core.util.path.CarbonStorePath class CarbonDictionaryDecodeReaderSupport[T] { @@ -38,7 +38,8 @@ class CarbonDictionaryDecodeReaderSupport[T] { val dict: Dictionary = forwardDictionaryCache .get(new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier, carbonColumn.getColumnIdentifier, - carbonColumn.getDataType)) + carbonColumn.getColumnIdentifier.getDataType, + CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier))) (carbonColumn.getDataType, dict, index) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java index 24c09ca..7c2d157 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java @@ -423,7 +423,8 @@ public final class CarbonLoaderUtil { ColumnIdentifier columnIdentifier, String carbonStorePath, DataType dataType) throws IOException { return getDictionary( - new DictionaryColumnUniqueIdentifier(tableIdentifier, columnIdentifier, dataType), + new DictionaryColumnUniqueIdentifier(tableIdentifier, columnIdentifier, dataType, + CarbonStorePath.getCarbonTablePath(carbonStorePath, tableIdentifier)), carbonStorePath); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala index a7b8143..c7ed1c7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala @@ -34,7 +34,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.cache.dictionary.Dictionary +import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} @@ -42,7 +42,7 @@ import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnIdentif import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension import org.apache.carbondata.core.service.{CarbonCommonFactory, PathService} import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil} -import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.spark.load.CarbonLoaderUtil import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask} @@ -341,9 +341,16 @@ class CarbonGlobalDictionaryGenerateRDD( var dictionaryForDistinctValueLookUp: Dictionary = _ var dictionaryForSortIndexWriting: Dictionary = _ var dictionaryForDistinctValueLookUpCleared: Boolean = false + val dictionaryColumnUniqueIdentifier: DictionaryColumnUniqueIdentifier = new + DictionaryColumnUniqueIdentifier( + model.table, + model.columnIdentifier(split.index), + model.columnIdentifier(split.index).getDataType, + CarbonStorePath.getCarbonTablePath(model.hdfsLocation, model.table)) val pathService: PathService = CarbonCommonFactory.getPathService val carbonTablePath: CarbonTablePath = - pathService.getCarbonTablePath(model.hdfsLocation, model.table) + pathService + .getCarbonTablePath(model.hdfsLocation, model.table, dictionaryColumnUniqueIdentifier) if (StringUtils.isNotBlank(model.hdfsTempLocation)) { CarbonProperties.getInstance.addProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, model.hdfsTempLocation) @@ -402,7 +409,7 @@ class CarbonGlobalDictionaryGenerateRDD( val dictWriteTask = new DictionaryWriterTask(valuesBuffer, dictionaryForDistinctValueLookUp, model.table, - model.columnIdentifier(split.index), + dictionaryColumnUniqueIdentifier, model.hdfsLocation, model.primDimensions(split.index).getColumnSchema, isDictFileExists @@ -414,7 +421,7 @@ class CarbonGlobalDictionaryGenerateRDD( // if new data came than rewrite sort index file if (distinctValues.size() > 0) { val sortIndexWriteTask = new SortIndexWriterTask(model.table, - model.columnIdentifier(split.index), + dictionaryColumnUniqueIdentifier, model.primDimensions(split.index).getDataType, model.hdfsLocation, dictionaryForDistinctValueLookUp, http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala index 2b1ccdf..a9ac9f1 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala @@ -20,7 +20,7 @@ import java.io.IOException import scala.collection.mutable -import org.apache.carbondata.core.cache.dictionary.Dictionary +import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnIdentifier} import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema @@ -33,7 +33,7 @@ import org.apache.carbondata.core.writer.CarbonDictionaryWriter * @param valuesBuffer * @param dictionary * @param carbonTableIdentifier - * @param columnIdentifier + * @param dictionaryColumnUniqueIdentifier * @param carbonStoreLocation * @param columnSchema * @param isDictionaryFileExist @@ -42,7 +42,7 @@ import org.apache.carbondata.core.writer.CarbonDictionaryWriter class DictionaryWriterTask(valuesBuffer: mutable.HashSet[String], dictionary: Dictionary, carbonTableIdentifier: CarbonTableIdentifier, - columnIdentifier: ColumnIdentifier, + dictionaryColumnUniqueIdentifier: DictionaryColumnUniqueIdentifier, carbonStoreLocation: String, columnSchema: ColumnSchema, isDictionaryFileExist: Boolean, @@ -59,7 +59,7 @@ class DictionaryWriterTask(valuesBuffer: mutable.HashSet[String], val dictService = CarbonCommonFactory.getDictionaryService writer = dictService.getDictionaryWriter( carbonTableIdentifier, - columnIdentifier, + dictionaryColumnUniqueIdentifier, carbonStoreLocation) val distinctValues: java.util.List[String] = new java.util.ArrayList() http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala index c0aa0f9..27f9418 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala @@ -16,7 +16,7 @@ */ package org.apache.carbondata.spark.tasks -import org.apache.carbondata.core.cache.dictionary.Dictionary +import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier} import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnIdentifier} import org.apache.carbondata.core.metadata.datatype.DataType import org.apache.carbondata.core.service.CarbonCommonFactory @@ -26,7 +26,7 @@ import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWri * This task writes sort index file * * @param carbonTableIdentifier - * @param columnIdentifier + * @param dictionaryColumnUniqueIdentifier * @param dataType * @param carbonStoreLocation * @param dictionary @@ -35,7 +35,7 @@ import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWri */ class SortIndexWriterTask( carbonTableIdentifier: CarbonTableIdentifier, - columnIdentifier: ColumnIdentifier, + dictionaryColumnUniqueIdentifier: DictionaryColumnUniqueIdentifier, dataType: DataType, carbonStoreLocation: String, dictionary: Dictionary, @@ -50,7 +50,8 @@ class SortIndexWriterTask( preparator.getDictionarySortInfo(distinctValues, dictionary, dataType) carbonDictionarySortIndexWriter = - dictService.getDictionarySortIndexWriter(carbonTableIdentifier, columnIdentifier, + dictService + .getDictionarySortIndexWriter(carbonTableIdentifier, dictionaryColumnUniqueIdentifier, carbonStoreLocation) carbonDictionarySortIndexWriter.writeSortIndex(dictionarySortInfo.getSortIndex) carbonDictionarySortIndexWriter http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala index e1c564d..1f7862b 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.util.FileUtils import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.cache.dictionary.Dictionary +import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} @@ -169,9 +169,15 @@ object GlobalDictionaryUtil { columnIndex: Int, iter: Iterator[String]): Unit = { val dictService = CarbonCommonFactory.getDictionaryService - val writer: CarbonDictionaryWriter = dictService.getDictionaryWriter( + val dictionaryColumnUniqueIdentifier: DictionaryColumnUniqueIdentifier = new + DictionaryColumnUniqueIdentifier( model.table, model.columnIdentifier(columnIndex), + model.columnIdentifier(columnIndex).getDataType, + CarbonStorePath.getCarbonTablePath(model.hdfsLocation, model.table)) + val writer: CarbonDictionaryWriter = dictService.getDictionaryWriter( + model.table, + dictionaryColumnUniqueIdentifier, model.hdfsLocation ) try { @@ -207,10 +213,16 @@ object GlobalDictionaryUtil { val dictMap = new HashMap[String, HashSet[String]] val dictService = CarbonCommonFactory.getDictionaryService for (i <- model.primDimensions.indices) { + val dictionaryColumnUniqueIdentifier: DictionaryColumnUniqueIdentifier = new + DictionaryColumnUniqueIdentifier( + model.table, + model.columnIdentifier(i), + model.columnIdentifier(i).getDataType, + CarbonStorePath.getCarbonTablePath(model.hdfsLocation, model.table)) val set = new HashSet[String] if (model.dictFileExists(i)) { val reader: CarbonDictionaryReader = dictService.getDictionaryReader(model.table, - model.columnIdentifier(i), model.hdfsLocation + dictionaryColumnUniqueIdentifier, model.hdfsLocation ) val values = reader.read if (values != null) { @@ -835,6 +847,12 @@ object GlobalDictionaryUtil { val columnIdentifier = new ColumnIdentifier(columnSchema.getColumnUniqueId, null, columnSchema.getDataType) + val dictionaryColumnUniqueIdentifier: DictionaryColumnUniqueIdentifier = new + DictionaryColumnUniqueIdentifier( + tableIdentifier, + columnIdentifier, + columnIdentifier.getDataType, + carbonTablePath) val parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(defaultValue, columnSchema) val valuesBuffer = new mutable.HashSet[String] if (null != parsedValue) { @@ -843,7 +861,7 @@ object GlobalDictionaryUtil { val dictWriteTask = new DictionaryWriterTask(valuesBuffer, dictionary, tableIdentifier, - columnIdentifier, + dictionaryColumnUniqueIdentifier, storePath, columnSchema, false @@ -855,7 +873,7 @@ object GlobalDictionaryUtil { if (distinctValues.size() > 0) { val sortIndexWriteTask = new SortIndexWriterTask(tableIdentifier, - columnIdentifier, + dictionaryColumnUniqueIdentifier, columnSchema.getDataType, storePath, dictionary, http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index 621a960..37505d0 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -35,8 +35,8 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnIdent import org.apache.carbondata.core.metadata.datatype.DataType import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension -import org.apache.carbondata.core.stats._ import org.apache.carbondata.core.util.{CarbonTimeStatisticsFactory, DataTypeUtil} +import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.spark.CarbonAliasDecoderRelation /** @@ -224,7 +224,8 @@ case class CarbonDictionaryDecoder( if (dictionaryId._2 != null) { new DictionaryColumnUniqueIdentifier( atiMap(dictionaryId._1).getCarbonTableIdentifier, - dictionaryId._2, dictionaryId._3) + dictionaryId._2, dictionaryId._3, + CarbonStorePath.getCarbonTablePath(atiMap(dictionaryId._1))) } else { null } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/integration/spark/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala index c0dfc68..e2b185e 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.test.TestQueryExecutor import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier import org.apache.carbondata.core.metadata.CarbonTableIdentifier import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.spark.load.CarbonLoaderUtil /** @@ -41,7 +42,8 @@ object DictionaryTestCaseUtil { val dimension = table.getDimensionByName(table.getFactTableName, columnName) val tableIdentifier = new CarbonTableIdentifier(table.getDatabaseName, table.getFactTableName, "uniqueid") val columnIdentifier = new DictionaryColumnUniqueIdentifier(tableIdentifier, - dimension.getColumnIdentifier, dimension.getDataType + dimension.getColumnIdentifier, dimension.getDataType, + CarbonStorePath.getCarbonTablePath(table.getStorePath, table.getCarbonTableIdentifier) ) val dict = CarbonLoaderUtil.getDictionary(columnIdentifier, TestQueryExecutor.storeLocation) assert(dict.getSurrogateKey(value) != CarbonCommonConstants.INVALID_SURROGATE_KEY) http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala index a35f88b..a126686 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala @@ -24,10 +24,13 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.{CarbonEnv, CarbonRelation} import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier import org.apache.carbondata.core.service.impl.PathFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.processing.constants.TableOptionConstant import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel} @@ -130,8 +133,13 @@ class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAft } val carbonTableIdentifier = sampleRelation.tableMeta.carbonTable.getCarbonTableIdentifier val columnIdentifier = sampleRelation.tableMeta.carbonTable.getDimensionByName("employee", "empid").getColumnIdentifier + val dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier( + carbonTableIdentifier, + columnIdentifier, + columnIdentifier.getDataType, + CarbonStorePath.getCarbonTablePath(storeLocation, carbonTableIdentifier)) val carbonTablePath = PathFactory.getInstance() - .getCarbonTablePath(sampleRelation.tableMeta.storePath, carbonTableIdentifier) + .getCarbonTablePath(sampleRelation.tableMeta.storePath, carbonTableIdentifier, dictionaryColumnUniqueIdentifier) val dictPath = carbonTablePath.getDictionaryFilePath(columnIdentifier.getColumnId) val dictFile = FileFactory.getCarbonFile(dictPath, FileFactory.getFileType(dictPath)) val offSet = dictFile.getSize http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index 7e27d1b..1addd03 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -39,6 +39,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension import org.apache.carbondata.core.util.DataTypeUtil +import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.spark.CarbonAliasDecoderRelation import org.apache.carbondata.spark.rdd.{CarbonRDD, CarbonRDDWithTableInfo} import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl @@ -248,7 +249,8 @@ case class CarbonDictionaryDecoder( try { cache.get(new DictionaryColumnUniqueIdentifier( atiMap(f._1).getCarbonTableIdentifier, - f._2, f._3.getDataType)) + f._2, f._3.getDataType, + CarbonStorePath.getCarbonTablePath(atiMap(f._1)))) } catch { case _: Throwable => null } @@ -268,7 +270,8 @@ case class CarbonDictionaryDecoder( try { val dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier( atiMap(tableName).getCarbonTableIdentifier, - columnIdentifier, carbonDimension.getDataType) + columnIdentifier, carbonDimension.getDataType, + CarbonStorePath.getCarbonTablePath(atiMap(tableName))) allDictIdentifiers += dictionaryColumnUniqueIdentifier; new ForwardDictionaryWrapper( storePath, @@ -566,7 +569,8 @@ class CarbonDecoderRDD( try { cache.get(new DictionaryColumnUniqueIdentifier( atiMap(f._1).getCarbonTableIdentifier, - f._2, f._3.getDataType)) + f._2, f._3.getDataType, + CarbonStorePath.getCarbonTablePath(atiMap(f._1)))) } catch { case _: Throwable => null } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala index 62b0aff..e7eb422 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.test.TestQueryExecutor import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.CarbonTableIdentifier +import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.spark.load.CarbonLoaderUtil /** @@ -41,7 +42,8 @@ object DictionaryTestCaseUtil { val dimension = table.getDimensionByName(table.getFactTableName, columnName) val tableIdentifier = new CarbonTableIdentifier(table.getDatabaseName, table.getFactTableName, "uniqueid") val columnIdentifier = new DictionaryColumnUniqueIdentifier(tableIdentifier, - dimension.getColumnIdentifier, dimension.getDataType + dimension.getColumnIdentifier, dimension.getDataType, + CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier) ) val dict = CarbonLoaderUtil.getDictionary(columnIdentifier, TestQueryExecutor.storeLocation) assert(dict.getSurrogateKey(value) != CarbonCommonConstants.INVALID_SURROGATE_KEY) http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java index 729f9e3..8373b58 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java @@ -41,6 +41,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.processing.newflow.dictionary.DictionaryServerClientDictionary; import org.apache.carbondata.processing.newflow.dictionary.DirectDictionary; import org.apache.carbondata.processing.newflow.dictionary.PreCreatedDictionary; @@ -119,7 +120,8 @@ public class PrimitiveDataType implements GenericDataType<Object> { this.carbonDimension = carbonDimension; DictionaryColumnUniqueIdentifier identifier = new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, - carbonDimension.getColumnIdentifier(), carbonDimension.getDataType()); + carbonDimension.getColumnIdentifier(), carbonDimension.getDataType(), + CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)); try { if (carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) { dictionaryGenerator = new DirectDictionary(DirectDictionaryKeyGeneratorFactory http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java index 5f7bd02..2614e17 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java @@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.processing.newflow.DataField; import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder; import org.apache.carbondata.processing.newflow.dictionary.DictionaryServerClientDictionary; @@ -69,7 +70,8 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert this.isEmptyBadRecord = isEmptyBadRecord; DictionaryColumnUniqueIdentifier identifier = new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, - dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType()); + dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType(), + CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)); // if use one pass, use DictionaryServerClientDictionary if (useOnePass) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3a09e27/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java index aad0d3f..3bb186e 100644 --- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java +++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java @@ -317,9 +317,11 @@ public class StoreCreator { .createCache(CacheType.REVERSE_DICTIONARY, absoluteTableIdentifier.getStorePath()); for (int i = 0; i < set.length; i++) { ColumnIdentifier columnIdentifier = new ColumnIdentifier(dims.get(i).getColumnId(), null, null); + DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier(table.getCarbonTableIdentifier(), columnIdentifier, columnIdentifier.getDataType(), + CarbonStorePath.getCarbonTablePath(table.getStorePath(), table.getCarbonTableIdentifier())); CarbonDictionaryWriter writer = new CarbonDictionaryWriterImpl(absoluteTableIdentifier.getStorePath(), - absoluteTableIdentifier.getCarbonTableIdentifier(), columnIdentifier); + absoluteTableIdentifier.getCarbonTableIdentifier(), dictionaryColumnUniqueIdentifier); for (String value : set[i]) { writer.write(value); } @@ -327,7 +329,8 @@ public class StoreCreator { writer.commit(); Dictionary dict = (Dictionary) dictCache.get( new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier(), - columnIdentifier, dims.get(i).getDataType())); + columnIdentifier, dims.get(i).getDataType(), + CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier))); CarbonDictionarySortInfoPreparator preparator = new CarbonDictionarySortInfoPreparator(); List<String> newDistinctValues = new ArrayList<String>(); @@ -335,7 +338,7 @@ public class StoreCreator { preparator.getDictionarySortInfo(newDistinctValues, dict, dims.get(i).getDataType()); CarbonDictionarySortIndexWriter carbonDictionaryWriter = new CarbonDictionarySortIndexWriterImpl( - absoluteTableIdentifier.getCarbonTableIdentifier(), columnIdentifier, + absoluteTableIdentifier.getCarbonTableIdentifier(), dictionaryColumnUniqueIdentifier, absoluteTableIdentifier.getStorePath()); try { carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex());
