Repository: carbondata Updated Branches: refs/heads/master a7926ea13 -> 531ecdf3f
http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java index 6803fc8..c6efd77 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java @@ -70,9 +70,15 @@ public class SegmentIndexFileStore { */ private Map<String, byte[]> carbonIndexMapWithFullPath; + /** + * Stores the list of index files in a merge file + */ + private Map<String, List<String>> carbonMergeFileToIndexFilesMap; + public SegmentIndexFileStore() { carbonIndexMap = new HashMap<>(); carbonIndexMapWithFullPath = new HashMap<>(); + carbonMergeFileToIndexFilesMap = new HashMap<>(); } /** @@ -201,6 +207,28 @@ public class SegmentIndexFileStore { } /** + * Read all index file names of the segment + * + * @param segmentPath + * @return + * @throws IOException + */ + public Map<String, String> getMergeOrIndexFilesFromSegment(String segmentPath) + throws IOException { + CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath); + Map<String, String> indexFiles = new HashMap<>(); + for (int i = 0; i < carbonIndexFiles.length; i++) { + if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) { + indexFiles + .put(carbonIndexFiles[i].getAbsolutePath(), carbonIndexFiles[i].getAbsolutePath()); + } else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) { + indexFiles.put(carbonIndexFiles[i].getAbsolutePath(), null); + } + } + return indexFiles; + } + + /** * List all the index files inside merge file. * @param mergeFile * @return @@ -221,13 +249,14 @@ public class SegmentIndexFileStore { * @param mergeFilePath * @throws IOException */ - private void readMergeFile(String mergeFilePath) throws IOException { + public void readMergeFile(String mergeFilePath) throws IOException { ThriftReader thriftReader = new ThriftReader(mergeFilePath); try { thriftReader.open(); MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader); MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader); List<String> file_names = indexHeader.getFile_names(); + carbonMergeFileToIndexFilesMap.put(mergeFilePath, file_names); List<ByteBuffer> fileData = mergedBlockIndex.getFileData(); CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath); assert (file_names.size() == fileData.size()); @@ -298,8 +327,8 @@ public class SegmentIndexFileStore { CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); return carbonFile.listFiles(new CarbonFileFilter() { @Override public boolean accept(CarbonFile file) { - return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName() - .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT); + return ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName() + .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() > 0); } }); } @@ -428,4 +457,8 @@ public class SegmentIndexFileStore { + " is " + (System.currentTimeMillis() - startTime)); return carbondataFileFooter.getBlockletList(); } + + public Map<String, List<String>> getCarbonMergeFileToIndexFilesMap() { + return carbonMergeFileToIndexFilesMap; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java index b764bdf..496a1d0 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java @@ -16,13 +16,15 @@ */ package org.apache.carbondata.core.indexstore.row; +import java.io.Serializable; + import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; /** * It is just a normal row to store data. Implementation classes could be safe and unsafe. * TODO move this class a global row and use across loading after DataType is changed class */ -public abstract class DataMapRow { +public abstract class DataMapRow implements Serializable { protected CarbonRowSchema[] schemas; @@ -88,4 +90,13 @@ public abstract class DataMapRow { public int getColumnCount() { return schemas.length; } + + /** + * default implementation + * + * @return + */ + public DataMapRow convertToSafeRow() { + return this; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java index 1b95984..1c1ecad 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java @@ -30,7 +30,12 @@ import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe; */ public class UnsafeDataMapRow extends DataMapRow { - private MemoryBlock block; + private static final long serialVersionUID = -1156704133552046321L; + + // As it is an unsafe memory block it is not recommended to serialize. + // If at all required to be serialized then override writeObject methods + // to which should take care of clearing the unsafe memory post serialization + private transient MemoryBlock block; private int pointer; http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java index 813be4a..1a77467 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java @@ -16,12 +16,16 @@ */ package org.apache.carbondata.core.indexstore.schema; +import java.io.Serializable; + import org.apache.carbondata.core.metadata.datatype.DataType; /** * It just have 2 types right now, either fixed or variable. */ -public abstract class CarbonRowSchema { +public abstract class CarbonRowSchema implements Serializable { + + private static final long serialVersionUID = -8061282029097686495L; protected DataType dataType; @@ -29,6 +33,10 @@ public abstract class CarbonRowSchema { this.dataType = dataType; } + public void setDataType(DataType dataType) { + this.dataType = dataType; + } + /** * Either fixed or variable length. * http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index dff496b..9326b1d 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -492,6 +492,35 @@ public class SegmentFileStore { } /** + * Gets all index files from this segment + * @return + */ + public Map<String, String> getIndexOrMergeFiles() { + Map<String, String> indexFiles = new HashMap<>(); + if (segmentFile != null) { + for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) { + String location = entry.getKey(); + if (entry.getValue().isRelative) { + location = tablePath + location; + } + if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) { + String mergeFileName = entry.getValue().getMergeFileName(); + if (null != mergeFileName) { + indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + mergeFileName, + entry.getValue().mergeFileName); + } else { + for (String indexFile : entry.getValue().getFiles()) { + indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile, + entry.getValue().mergeFileName); + } + } + } + } + } + return indexFiles; + } + + /** * Gets all carbon index files from this segment * @return */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java index c8ac15a..c7bcf2e 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java @@ -97,6 +97,11 @@ public class TableInfo implements Serializable, Writable { private List<RelationIdentifier> parentRelationIdentifiers; + /** + * flag to check whether any schema modification operation has happened after creation of table + */ + private boolean isSchemaModified; + public TableInfo() { dataMapSchemaList = new ArrayList<>(); isTransactionalTable = true; @@ -115,6 +120,18 @@ public class TableInfo implements Serializable, Writable { public void setFactTable(TableSchema factTable) { this.factTable = factTable; updateParentRelationIdentifier(); + updateIsSchemaModified(); + } + + private void updateIsSchemaModified() { + if (null != factTable.getSchemaEvalution()) { + // If schema evolution entry list size is > 1 that means an alter operation is performed + // which has added the new schema entry in the schema evolution list. + // Currently apart from create table schema evolution entries + // are getting added only in the alter operations. + isSchemaModified = + factTable.getSchemaEvalution().getSchemaEvolutionEntryList().size() > 1 ? true : false; + } } private void updateParentRelationIdentifier() { @@ -273,6 +290,7 @@ public class TableInfo implements Serializable, Writable { parentRelationIdentifiers.get(i).write(out); } } + out.writeBoolean(isSchemaModified); } @Override public void readFields(DataInput in) throws IOException { @@ -308,6 +326,7 @@ public class TableInfo implements Serializable, Writable { this.parentRelationIdentifiers.add(relationIdentifier); } } + this.isSchemaModified = in.readBoolean(); } public AbsoluteTableIdentifier getOrCreateAbsoluteTableIdentifier() { @@ -343,4 +362,9 @@ public class TableInfo implements Serializable, Writable { public void setTransactionalTable(boolean transactionalTable) { isTransactionalTable = transactionalTable; } + + public boolean isSchemaModified() { + return isSchemaModified; + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java index bc4a90d..41ce31c 100644 --- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java +++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java @@ -68,11 +68,11 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope { if (segment.getSegmentFileName() == null) { String path = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()); - indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path); + indexFiles = new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path); } else { SegmentFileStore fileStore = new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName()); - indexFiles = fileStore.getIndexFiles(); + indexFiles = fileStore.getIndexOrMergeFiles(); } return indexFiles; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java new file mode 100644 index 0000000..0d28b9f --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.BlockMetaInfo; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +public class BlockletDataMapUtil { + + public static Map<String, BlockMetaInfo> getBlockMetaInfoMap( + TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore, + Set<String> filesRead, Map<String, BlockMetaInfo> fileNameToMetaInfoMapping) + throws IOException { + if (identifier.getMergeIndexFileName() != null + && indexFileStore.getFileData(identifier.getIndexFileName()) == null) { + CarbonFile indexMergeFile = FileFactory.getCarbonFile( + identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier + .getMergeIndexFileName()); + if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) { + indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile }); + filesRead.add(indexMergeFile.getPath()); + } + } + if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) { + indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile( + identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier + .getIndexFileName()) }); + } + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>(); + List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo( + identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier + .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName())); + for (DataFileFooter footer : indexInfo) { + String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath(); + if (null == blockMetaInfoMap.get(blockPath)) { + blockMetaInfoMap.put(blockPath, createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath)); + } + } + return blockMetaInfoMap; + } + + /** + * This method will create file name to block Meta Info Mapping. This method will reduce the + * number of namenode calls and using this method one namenode will fetch 1000 entries + * + * @param segmentFilePath + * @return + * @throws IOException + */ + public static Map<String, BlockMetaInfo> createCarbonDataFileBlockMetaInfoMapping( + String segmentFilePath) throws IOException { + Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap(); + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath); + if (carbonFile instanceof AbstractDFSCarbonFile) { + PathFilter pathFilter = new PathFilter() { + @Override public boolean accept(Path path) { + return CarbonTablePath.isCarbonDataFile(path.getName()); + } + }; + CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles(pathFilter); + for (CarbonFile file : carbonFiles) { + String[] location = file.getLocations(); + long len = file.getSize(); + BlockMetaInfo blockMetaInfo = new BlockMetaInfo(location, len); + fileNameToMetaInfoMapping.put(file.getPath().toString(), blockMetaInfo); + } + } + return fileNameToMetaInfoMapping; + } + + private static BlockMetaInfo createBlockMetaInfo( + Map<String, BlockMetaInfo> fileNameToMetaInfoMapping, String carbonDataFile) { + FileFactory.FileType fileType = FileFactory.getFileType(carbonDataFile); + switch (fileType) { + case LOCAL: + CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile, fileType); + return new BlockMetaInfo(new String[] { "localhost" }, carbonFile.getSize()); + default: + return fileNameToMetaInfoMapping.get(carbonDataFile); + } + } + + public static Set<TableBlockIndexUniqueIdentifier> getTableBlockUniqueIdentifiers(Segment segment) + throws IOException { + Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new HashSet<>(); + Map<String, String> indexFiles = segment.getCommittedIndexFile(); + for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) { + Path indexFile = new Path(indexFileEntry.getKey()); + tableBlockIndexUniqueIdentifiers.add( + new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(), indexFile.getName(), + indexFileEntry.getValue(), segment.getSegmentNo())); + } + return tableBlockIndexUniqueIdentifiers; + } + + /** + * This method will filter out the TableBlockIndexUniqueIdentifier belongs to that distributable + * + * @param tableBlockIndexUniqueIdentifiers + * @param distributable + * @return + */ + public static TableBlockIndexUniqueIdentifier filterIdentifiersBasedOnDistributable( + Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers, + BlockletDataMapDistributable distributable) { + TableBlockIndexUniqueIdentifier validIdentifier = null; + String fileName = CarbonTablePath.DataFileUtil.getFileName(distributable.getFilePath()); + for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : + tableBlockIndexUniqueIdentifiers) { + if (fileName.equals(tableBlockIndexUniqueIdentifier.getIndexFileName())) { + validIdentifier = tableBlockIndexUniqueIdentifier; + break; + } + } + return validIdentifier; + } + + /** + * This method will the index files tableBlockIndexUniqueIdentifiers of a merge index file + * + * @param identifier + * @return + * @throws IOException + */ + public static List<TableBlockIndexUniqueIdentifier> getIndexFileIdentifiersFromMergeFile( + TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore segmentIndexFileStore) + throws IOException { + List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new ArrayList<>(); + String mergeFilePath = + identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier + .getIndexFileName(); + segmentIndexFileStore.readMergeFile(mergeFilePath); + List<String> indexFiles = + segmentIndexFileStore.getCarbonMergeFileToIndexFilesMap().get(mergeFilePath); + for (String indexFile : indexFiles) { + tableBlockIndexUniqueIdentifiers.add( + new TableBlockIndexUniqueIdentifier(identifier.getIndexFilePath(), indexFile, + identifier.getIndexFileName(), identifier.getSegmentId())); + } + return tableBlockIndexUniqueIdentifiers; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java index 8544da9..3823aef 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java +++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java @@ -219,6 +219,11 @@ public class SessionParams implements Serializable, Cloneable { throw new InvalidConfigurationException( String.format("Invalid configuration of %s, datamap does not exist", key)); } + } else if (key.startsWith(CarbonCommonConstants.CARBON_LOAD_DATAMAPS_PARALLEL)) { + isValid = CarbonUtil.validateBoolean(value); + if (!isValid) { + throw new InvalidConfigurationException("Invalid value " + value + " for key " + key); + } } else { throw new InvalidConfigurationException( "The key " + key + " not supported for dynamic configuration."); http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index b9f4838..62192ff 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -533,7 +533,7 @@ public class CarbonTablePath { /** * Return the file name from file path */ - private static String getFileName(String dataFilePath) { + public static String getFileName(String dataFilePath) { int endIndex = dataFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR); if (endIndex > -1) { return dataFilePath.substring(endIndex + 1, dataFilePath.length()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java new file mode 100644 index 0000000..dfbdd29 --- /dev/null +++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.indexstore.blockletindex; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper; +import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; + +import mockit.Deencapsulation; +import mockit.Mock; +import mockit.MockUp; +import org.junit.Before; +import org.junit.Test; + +public class TestBlockletDataMapFactory { + + private CarbonTable carbonTable; + + private AbsoluteTableIdentifier absoluteTableIdentifier; + + private TableInfo tableInfo; + + private BlockletDataMapFactory blockletDataMapFactory; + + private TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier; + + private Cache<TableBlockIndexUniqueIdentifier, BlockletDataMapIndexWrapper> cache; + + @Before public void setUp() + throws ClassNotFoundException, IllegalAccessException, InvocationTargetException, + InstantiationException { + tableInfo = new TableInfo(); + Constructor<?> constructor = + Class.forName("org.apache.carbondata.core.metadata.schema.table.CarbonTable") + .getDeclaredConstructors()[0]; + constructor.setAccessible(true); + carbonTable = (CarbonTable) constructor.newInstance(); + absoluteTableIdentifier = + AbsoluteTableIdentifier.from("/opt/store/default/carbon_table/", "default", "carbon_table"); + Deencapsulation.setField(tableInfo, "identifier", absoluteTableIdentifier); + Deencapsulation.setField(carbonTable, "tableInfo", tableInfo); + blockletDataMapFactory = new BlockletDataMapFactory(carbonTable, new DataMapSchema()); + Deencapsulation.setField(blockletDataMapFactory, "cache", + CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_DATAMAP)); + tableBlockIndexUniqueIdentifier = + new TableBlockIndexUniqueIdentifier("/opt/store/default/carbon_table/Fact/Part0/Segment_0", + "0_batchno0-0-1521012756709.carbonindex", null, "0"); + cache = CacheProvider.getInstance().createCache(CacheType.DRIVER_BLOCKLET_DATAMAP); + } + + @Test public void addDataMapToCache() + throws IOException, MemoryException, NoSuchMethodException, InvocationTargetException, + IllegalAccessException { + List<BlockletDataMap> dataMaps = new ArrayList<>(); + Method method = BlockletDataMapFactory.class + .getDeclaredMethod("cache", TableBlockIndexUniqueIdentifier.class, + BlockletDataMapIndexWrapper.class); + method.setAccessible(true); + method.invoke(blockletDataMapFactory, tableBlockIndexUniqueIdentifier, + new BlockletDataMapIndexWrapper(dataMaps)); + BlockletDataMapIndexWrapper result = cache.getIfPresent(tableBlockIndexUniqueIdentifier); + assert null != result; + } + + @Test public void getValidDistributables() throws IOException { + BlockletDataMapDistributable blockletDataMapDistributable = new BlockletDataMapDistributable( + "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756709.carbonindex"); + Segment segment = new Segment("0", null); + blockletDataMapDistributable.setSegment(segment); + BlockletDataMapDistributable blockletDataMapDistributable1 = new BlockletDataMapDistributable( + "/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756701.carbonindex"); + blockletDataMapDistributable1.setSegment(segment); + List<DataMapDistributable> dataMapDistributables = new ArrayList<>(2); + dataMapDistributables.add(blockletDataMapDistributable); + dataMapDistributables.add(blockletDataMapDistributable1); + new MockUp<BlockletDataMapFactory>() { + @Mock Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers( + Segment segment) { + TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier1 = + new TableBlockIndexUniqueIdentifier( + "/opt/store/default/carbon_table/Fact/Part0/Segment_0", + "0_batchno0-0-1521012756701.carbonindex", null, "0"); + Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = new HashSet<>(3); + tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier); + tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier1); + return tableBlockIndexUniqueIdentifiers; + } + }; + List<DataMapDistributable> validDistributables = + blockletDataMapFactory.getAllUncachedDistributables(dataMapDistributables); + assert 1 == validDistributables.size(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java index 8be1e2e..32af8d3 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java @@ -16,21 +16,39 @@ */ package org.apache.carbondata.hadoop; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.CacheProvider; import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.datastore.SegmentTaskIndexStore; import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier; +import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; /** * CacheClient : Holds all the Cache access clients for Btree, Dictionary */ public class CacheClient { + private static final LogService LOGGER = + LogServiceFactory.getLogService(CacheClient.class.getName()); + + private final Object lock = new Object(); + // segment access client for driver LRU cache private CacheAccessClient<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> segmentAccessClient; + private static Map<SegmentTaskIndexStore.SegmentPropertiesWrapper, SegmentProperties> + segmentProperties = new ConcurrentHashMap<>(); + public CacheClient() { Cache<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> segmentCache = CacheProvider.getInstance().createCache(CacheType.DRIVER_BTREE); @@ -45,4 +63,35 @@ public class CacheClient { public void close() { segmentAccessClient.close(); } + + /** + * Method to get the segment properties and avoid construction of new segment properties until + * the schema is not modified + * + * @param tableIdentifier + * @param columnsInTable + * @param columnCardinality + */ + public SegmentProperties getSegmentProperties(AbsoluteTableIdentifier tableIdentifier, + List<ColumnSchema> columnsInTable, int[] columnCardinality) { + SegmentTaskIndexStore.SegmentPropertiesWrapper segmentPropertiesWrapper = + new SegmentTaskIndexStore.SegmentPropertiesWrapper(tableIdentifier, columnsInTable, + columnCardinality); + SegmentProperties segmentProperties = this.segmentProperties.get(segmentPropertiesWrapper); + if (null == segmentProperties) { + synchronized (lock) { + segmentProperties = this.segmentProperties.get(segmentPropertiesWrapper); + if (null == segmentProperties) { + // create a metadata details + // this will be useful in query handling + // all the data file metadata will have common segment properties we + // can use first one to get create the segment properties + LOGGER.info("Constructing new SegmentProperties"); + segmentProperties = new SegmentProperties(columnsInTable, columnCardinality); + this.segmentProperties.put(segmentPropertiesWrapper, segmentProperties); + } + } + } + return segmentProperties; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java new file mode 100644 index 0000000..6835184 --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/AbstractDataMapJob.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.util.List; + +import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; + +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +/** + * abstract class for data map job + */ +public abstract class AbstractDataMapJob implements DataMapJob { + + @Override public void execute(CarbonTable carbonTable, + FileInputFormat<Void, BlockletDataMapIndexWrapper> format) { + } + + @Override public List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat, + FilterResolverIntf resolverIntf) { + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java index 2af147d..0e8cf6a 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java @@ -67,7 +67,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se // a cache for carbon table, it will be used in task side private CarbonTable carbonTable; - protected CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { CarbonTable carbonTableTemp; if (carbonTable == null) { // carbon table should be created either from deserialized table info (schema saved in http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 25cc228..a5d4df2 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -156,7 +156,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { /** * Get the cached CarbonTable or create it by TableInfo in `configuration` */ - protected abstract CarbonTable getOrCreateCarbonTable(Configuration configuration) + public abstract CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException; public static void setTablePath(Configuration configuration, String tablePath) { @@ -172,7 +172,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { configuration.set(ALTER_PARTITION_ID, partitionIds.toString()); } - public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob) + public static void setDataMapJob(Configuration configuration, Object dataMapJob) throws IOException { if (dataMapJob != null) { String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob); @@ -466,15 +466,30 @@ m filterExpression private List<ExtendedBlocklet> executeDataMapJob(CarbonTable carbonTable, FilterResolverIntf resolver, List<Segment> segmentIds, DataMapExprWrapper dataMapExprWrapper, DataMapJob dataMapJob, List<PartitionSpec> partitionsToPrune) throws IOException { - DistributableDataMapFormat datamapDstr = - new DistributableDataMapFormat(carbonTable, dataMapExprWrapper, segmentIds, - partitionsToPrune, BlockletDataMapFactory.class.getName()); - List<ExtendedBlocklet> prunedBlocklets = dataMapJob.execute(datamapDstr, resolver); + String className = "org.apache.carbondata.hadoop.api.DistributableDataMapFormat"; + FileInputFormat dataMapFormat = + createDataMapJob(carbonTable, dataMapExprWrapper, segmentIds, partitionsToPrune, className); + List<ExtendedBlocklet> prunedBlocklets = + dataMapJob.execute((DistributableDataMapFormat) dataMapFormat, resolver); // Apply expression on the blocklets. prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets); return prunedBlocklets; } + + public static FileInputFormat createDataMapJob(CarbonTable carbonTable, + DataMapExprWrapper dataMapExprWrapper, List<Segment> segments, + List<PartitionSpec> partitionsToPrune, String clsName) { + try { + Constructor<?> cons = Class.forName(clsName).getDeclaredConstructors()[0]; + return (FileInputFormat) cons + .newInstance(carbonTable, dataMapExprWrapper, segments, partitionsToPrune, + BlockletDataMapFactory.class.getName()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + /** * Prune the segments from the already pruned blocklets. * @param segments http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index f93be63..cd34bb8 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -103,7 +103,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { /** * Get the cached CarbonTable or create it by TableInfo in `configuration` */ - protected CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { if (carbonTable == null) { // carbon table should be created either from deserialized table info (schema saved in // hive metastore) or by reading schema in HDFS (schema saved in HDFS) http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java index 64936aa..c439219 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java @@ -19,15 +19,21 @@ package org.apache.carbondata.hadoop.api; import java.io.Serializable; import java.util.List; +import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + /** * Distributable datamap job to execute the #DistributableDataMapFormat in cluster. it prunes the * datamaps distributably and returns the final blocklet list */ public interface DataMapJob extends Serializable { + void execute(CarbonTable carbonTable, FileInputFormat<Void, BlockletDataMapIndexWrapper> format); + List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat, FilterResolverIntf filter); http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java index 36c7414..3208a28 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java @@ -22,6 +22,8 @@ import java.text.SimpleDateFormat; import java.util.List; import java.util.Locale; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -49,6 +51,12 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; */ public class CarbonInputFormatUtil { + /** + * Attribute for Carbon LOGGER. + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonProperties.class.getName()); + public static <V> CarbonTableInputFormat<V> createCarbonInputFormat( AbsoluteTableIdentifier identifier, Job job) throws IOException { @@ -58,6 +66,7 @@ public class CarbonInputFormatUtil { CarbonTableInputFormat.setTableName( job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName()); FileInputFormat.addInputPath(job, new Path(identifier.getTablePath())); + setDataMapJobIfConfigured(job.getConfiguration()); return carbonInputFormat; } @@ -71,6 +80,7 @@ public class CarbonInputFormatUtil { CarbonTableInputFormat.setTableName( job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName()); FileInputFormat.addInputPath(job, new Path(identifier.getTablePath())); + setDataMapJobIfConfigured(job.getConfiguration()); return carbonTableInputFormat; } @@ -108,11 +118,10 @@ public class CarbonInputFormatUtil { CarbonInputFormat.setQuerySegment(conf, identifier); CarbonInputFormat.setFilterPredicates(conf, filterExpression); CarbonInputFormat.setColumnProjection(conf, columnProjection); - if (dataMapJob != null && - Boolean.valueOf(CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, - CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT))) { + if (dataMapJob != null) { CarbonInputFormat.setDataMapJob(conf, dataMapJob); + } else { + setDataMapJobIfConfigured(conf); } // when validate segments is disabled in thread local update it to CarbonTableInputFormat CarbonSessionInfo carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo(); @@ -147,6 +156,32 @@ public class CarbonInputFormatUtil { return format; } + /** + * This method set DataMapJob if configured + * + * @param conf + * @throws IOException + */ + public static void setDataMapJobIfConfigured(Configuration conf) throws IOException { + String className = "org.apache.carbondata.spark.rdd.SparkDataMapJob"; + CarbonTableInputFormat.setDataMapJob(conf, createDataMapJob(className)); + } + + /** + * Creates instance for the DataMap Job class + * + * @param className + * @return + */ + public static Object createDataMapJob(String className) { + try { + return Class.forName(className).getDeclaredConstructors()[0].newInstance(); + } catch (Exception e) { + LOGGER.error(e); + return null; + } + } + public static String createJobTrackerID(java.util.Date date) { return new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(date); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala index 17c57b6..b0a59d4 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala @@ -41,6 +41,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, CarbonEnv.getDatabaseLocation("lucene", sqlContext.sparkSession)) + .addProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, "true") sql("use lucene") sql("DROP TABLE IF EXISTS normal_test") sql( http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 0d960d0..24a6927 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -56,6 +56,7 @@ import org.apache.carbondata.hadoop._ import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat} import org.apache.carbondata.hadoop.api.CarbonTableInputFormat import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.InitInputMetrics import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util} @@ -554,12 +555,7 @@ class CarbonScanRDD[T: ClassTag]( CarbonInputFormat.setQuerySegment(conf, identifier) CarbonInputFormat.setFilterPredicates(conf, filterExpression) CarbonInputFormat.setColumnProjection(conf, columnProjection) - CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob) - if (CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, - CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) { - CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob) - } + CarbonInputFormatUtil.setDataMapJobIfConfigured(conf) // when validate segments is disabled in thread local update it to CarbonTableInputFormat val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo @@ -599,12 +595,7 @@ class CarbonScanRDD[T: ClassTag]( CarbonInputFormat.setQuerySegment(conf, identifier) CarbonInputFormat.setFilterPredicates(conf, filterExpression) CarbonInputFormat.setColumnProjection(conf, columnProjection) - CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob) - if (CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, - CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) { - CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob) - } + CarbonInputFormatUtil.setDataMapJobIfConfigured(conf) // when validate segments is disabled in thread local update it to CarbonTableInputFormat val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo if (carbonSessionInfo != null) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala index 8d2f9ee..f51c3bc 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala @@ -29,12 +29,12 @@ import org.apache.spark.{Partition, SparkContext, TaskContext, TaskKilledExcepti import org.apache.carbondata.core.indexstore.ExtendedBlocklet import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf -import org.apache.carbondata.hadoop.api.{DataMapJob, DistributableDataMapFormat} +import org.apache.carbondata.hadoop.api.{AbstractDataMapJob, DistributableDataMapFormat} /** * Spark job to execute datamap job and prune all the datamaps distributable */ -class SparkDataMapJob extends DataMapJob { +class SparkDataMapJob extends AbstractDataMapJob { override def execute(dataMapFormat: DistributableDataMapFormat, filter: FilterResolverIntf): util.List[ExtendedBlocklet] = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala index 488a53d..6622246 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala @@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil case class CarbonCountStar( attributesRaw: Seq[Attribute], @@ -74,11 +75,13 @@ case class CarbonCountStar( val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]() val jobConf: JobConf = new JobConf(new Configuration) SparkHadoopUtil.get.addCredentials(jobConf) + CarbonInputFormat.setTableInfo(jobConf, carbonTable.getTableInfo) val job = new Job(jobConf) FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath)) CarbonInputFormat .setTransactionalTable(job.getConfiguration, carbonTable.getTableInfo.isTransactionalTable) + CarbonInputFormatUtil.setDataMapJobIfConfigured(job.getConfiguration) (job, carbonInputFormat) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/531ecdf3/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala index cce23dc..29dcec9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala @@ -115,6 +115,15 @@ object CarbonSetCommand { "\" carbon.datamap.visible.<database_name>.<table_name>.<database_name>" + " = <true/false> \" format") } + } else if (key.startsWith(CarbonCommonConstants.CARBON_LOAD_DATAMAPS_PARALLEL)) { + if (key.split("\\.").length == 6) { + sessionParams.addProperty(key.toLowerCase(), value) + } + else { + throw new MalformedCarbonCommandException( + "property should be in \" carbon.load.datamaps.parallel.<database_name>" + + ".<table_name>=<true/false> \" format.") + } } }