Repository: carbondata Updated Branches: refs/heads/make_carbontablepath_static [created] 817714bc9
http://git-wip-us.apache.org/repos/asf/carbondata/blob/35564379/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java index f51ced3..d8bc86b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java @@ -34,20 +34,16 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.block.TaskBlockInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.scan.executor.QueryExecutor; import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; -import org.apache.carbondata.core.scan.model.QueryDimension; -import org.apache.carbondata.core.scan.model.QueryMeasure; import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.result.BatchResult; +import org.apache.carbondata.core.scan.result.RowBatch; import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.DataTypeConverter; /** * Executor class for executing the query on the selected segments to be merged. @@ -70,6 +66,9 @@ public class CarbonCompactionExecutor { */ private boolean restructuredBlockExists; + // converter for UTF8String and decimal conversion + private DataTypeConverter dataTypeConverter; + /** * Constructor * @@ -82,13 +81,14 @@ public class CarbonCompactionExecutor { public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping, SegmentProperties segmentProperties, CarbonTable carbonTable, Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, - boolean restructuredBlockExists) { + boolean restructuredBlockExists, DataTypeConverter dataTypeConverter) { this.segmentMapping = segmentMapping; this.destinationSegProperties = segmentProperties; this.carbonTable = carbonTable; this.dataFileMetadataSegMapping = dataFileMetadataSegMapping; this.restructuredBlockExists = restructuredBlockExists; - queryExecutorList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + this.queryExecutorList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + this.dataTypeConverter = dataTypeConverter; } /** @@ -100,7 +100,10 @@ public class CarbonCompactionExecutor { List<RawResultIterator> resultList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); List<TableBlockInfo> list = null; - queryModel = prepareQueryModel(list); + queryModel = carbonTable.createQueryModelWithProjectAllColumns(); + queryModel.setConverter(dataTypeConverter); + queryModel.setReadPageByPage(enablePageLevelReaderForCompaction()); + queryModel.setForcedDetailRawQuery(true); // iterate each seg ID for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) { String segmentId = taskMap.getKey(); @@ -156,7 +159,7 @@ public class CarbonCompactionExecutor { * @param blockList * @return */ - private CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList) + private CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList) throws QueryExecutionException, IOException { queryModel.setTableBlockInfos(blockList); QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); @@ -195,48 +198,6 @@ public class CarbonCompactionExecutor { } /** - * Preparing of the query model. - * - * @param blockList - * @return - */ - private QueryModel prepareQueryModel(List<TableBlockInfo> blockList) { - QueryModel model = new QueryModel(); - model.setTableBlockInfos(blockList); - model.setForcedDetailRawQuery(true); - model.setFilterExpressionResolverTree(null); - model.setConverter(DataTypeUtil.getDataTypeConverter()); - model.setReadPageByPage(enablePageLevelReaderForCompaction()); - - List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - List<CarbonDimension> dimensions = - carbonTable.getDimensionByTableName(carbonTable.getTableName()); - for (CarbonDimension dim : dimensions) { - // check if dimension is deleted - QueryDimension queryDimension = new QueryDimension(dim.getColName()); - queryDimension.setDimension(dim); - dims.add(queryDimension); - } - model.setQueryDimension(dims); - - List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - List<CarbonMeasure> measures = - carbonTable.getMeasureByTableName(carbonTable.getTableName()); - for (CarbonMeasure carbonMeasure : measures) { - // check if measure is deleted - QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName()); - queryMeasure.setMeasure(carbonMeasure); - msrs.add(queryMeasure); - } - model.setQueryMeasures(msrs); - model.setQueryId(System.nanoTime() + ""); - model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier()); - model.setTable(carbonTable); - return model; - } - - /** * Whether to enable page level reader for compaction or not. */ private boolean enablePageLevelReaderForCompaction() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/35564379/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java index 79e9e5a..b6f12a5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.scan.model.CarbonQueryPlan; import org.apache.carbondata.processing.partition.DataPartitioner; import org.apache.carbondata.processing.partition.Partition; @@ -46,9 +45,8 @@ public final class QueryPartitionHelper { /** * Get partitions applicable for query based on filters applied in query */ - public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan) { - String tableUniqueName = - CarbonTable.buildUniqueName(queryPlan.getDatabaseName(), queryPlan.getTableName()); + public List<Partition> getPartitionsForQuery(String databaseName, String tableName) { + String tableUniqueName = CarbonTable.buildUniqueName(databaseName, tableName); DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName); http://git-wip-us.apache.org/repos/asf/carbondata/blob/35564379/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java index 36e022b..01db4f6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java @@ -18,7 +18,6 @@ package org.apache.carbondata.processing.partition.spliter; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -26,19 +25,14 @@ import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.dictionary.Dictionary; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.block.TaskBlockInfo; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.scan.executor.QueryExecutor; import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; -import org.apache.carbondata.core.scan.model.QueryDimension; -import org.apache.carbondata.core.scan.model.QueryMeasure; import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.result.BatchResult; +import org.apache.carbondata.core.scan.result.RowBatch; import org.apache.carbondata.core.util.CarbonUtil; public abstract class AbstractCarbonQueryExecutor { @@ -47,8 +41,8 @@ public abstract class AbstractCarbonQueryExecutor { LogServiceFactory.getLogService(AbstractCarbonQueryExecutor.class.getName()); protected CarbonTable carbonTable; protected QueryModel queryModel; - protected QueryExecutor queryExecutor; - protected Map<String, TaskBlockInfo> segmentMapping; + private QueryExecutor queryExecutor; + Map<String, TaskBlockInfo> segmentMapping; /** * get executor and execute the query model. @@ -56,7 +50,7 @@ public abstract class AbstractCarbonQueryExecutor { * @param blockList * @return */ - protected CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList) + CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList) throws QueryExecutionException, IOException { queryModel.setTableBlockInfos(blockList); this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); @@ -64,46 +58,6 @@ public abstract class AbstractCarbonQueryExecutor { } /** - * Preparing of the query model. - * - * @param blockList - * @return - */ - protected QueryModel prepareQueryModel(List<TableBlockInfo> blockList) { - QueryModel model = new QueryModel(); - model.setTableBlockInfos(blockList); - model.setForcedDetailRawQuery(true); - model.setFilterExpressionResolverTree(null); - - List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - List<CarbonDimension> dimensions = - carbonTable.getDimensionByTableName(carbonTable.getTableName()); - for (CarbonDimension dim : dimensions) { - // check if dimension is deleted - QueryDimension queryDimension = new QueryDimension(dim.getColName()); - queryDimension.setDimension(dim); - dims.add(queryDimension); - } - model.setQueryDimension(dims); - - List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - List<CarbonMeasure> measures = - carbonTable.getMeasureByTableName(carbonTable.getTableName()); - for (CarbonMeasure carbonMeasure : measures) { - // check if measure is deleted - QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName()); - queryMeasure.setMeasure(carbonMeasure); - msrs.add(queryMeasure); - } - model.setQueryMeasures(msrs); - model.setQueryId(System.nanoTime() + ""); - model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier()); - model.setTable(carbonTable); - return model; - } - - /** * Below method will be used * for cleanup */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/35564379/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java index 6afec0b..abe5057 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java @@ -31,6 +31,7 @@ import org.apache.carbondata.core.datastore.block.TaskBlockInfo; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator; +import org.apache.carbondata.core.util.DataTypeConverterImpl; /** * Used to read carbon blocks when add/split partition @@ -48,7 +49,9 @@ public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor { public List<PartitionSpliterRawResultIterator> processDataBlocks(String segmentId) throws QueryExecutionException, IOException { List<TableBlockInfo> list = null; - queryModel = prepareQueryModel(list); + queryModel = carbonTable.createQueryModelWithProjectAllColumns(); + queryModel.setConverter(new DataTypeConverterImpl()); + queryModel.setForcedDetailRawQuery(true); List<PartitionSpliterRawResultIterator> resultList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); TaskBlockInfo taskBlockInfo = segmentMapping.get(segmentId); http://git-wip-us.apache.org/repos/asf/carbondata/blob/35564379/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 9f3c86f..56e185f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -31,7 +31,6 @@ import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.CarbonUtil; @@ -62,7 +61,7 @@ public class CarbonFactDataHandlerModel { return blockSize; } - public void setBlockSizeInMB(int blockSize) { + private void setBlockSizeInMB(int blockSize) { this.blockSize = blockSize; } @@ -78,10 +77,6 @@ public class CarbonFactDataHandlerModel { * local store location */ private String[] storeLocation; - /** - * flag to check whether use inverted index - */ - private boolean[] isUseInvertedIndex; /** * length of each dimension, including dictionary, nodictioncy, complex dimension @@ -171,8 +166,6 @@ public class CarbonFactDataHandlerModel { int taskExtension) { CarbonTableIdentifier identifier = configuration.getTableIdentifier().getCarbonTableIdentifier(); - boolean[] isUseInvertedIndex = - CarbonDataProcessorUtil.getIsUseInvertedIndex(configuration.getDataFields()); int[] dimLensWithComplex = configuration.getCardinalityFinder().getCardinality(); if (!configuration.isSortTable()) { @@ -249,7 +242,6 @@ public class CarbonFactDataHandlerModel { carbonFactDataHandlerModel.setPrimitiveDimLens(simpleDimsLen); carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes); carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath); - carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndex); carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB()); carbonFactDataHandlerModel.setComplexDimensionKeyGenerator( configuration.createKeyGeneratorForComplexDimension()); @@ -311,13 +303,6 @@ public class CarbonFactDataHandlerModel { .checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(), tableName, loadModel.getSegmentId()); carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath); - List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName); - boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()]; - int index = 0; - for (CarbonDimension dimension : dimensionByTableName) { - isUseInvertedIndexes[index++] = dimension.isUseInvertedIndex(); - } - carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndexes); carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality()); carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB()); @@ -346,7 +331,7 @@ public class CarbonFactDataHandlerModel { return colCardinality; } - public void setColCardinality(int[] colCardinality) { + private void setColCardinality(int[] colCardinality) { this.colCardinality = colCardinality; } public CarbonDataFileAttributes getCarbonDataFileAttributes() { @@ -377,7 +362,7 @@ public class CarbonFactDataHandlerModel { return measureCount; } - public void setMeasureCount(int measureCount) { + private void setMeasureCount(int measureCount) { this.measureCount = measureCount; } @@ -393,7 +378,7 @@ public class CarbonFactDataHandlerModel { return dimLens; } - public void setDimLens(int[] dimLens) { + private void setDimLens(int[] dimLens) { this.dimLens = dimLens; } @@ -401,7 +386,7 @@ public class CarbonFactDataHandlerModel { return noDictionaryCount; } - public void setNoDictionaryCount(int noDictionaryCount) { + private void setNoDictionaryCount(int noDictionaryCount) { this.noDictionaryCount = noDictionaryCount; } @@ -409,7 +394,7 @@ public class CarbonFactDataHandlerModel { return dimensionCount; } - public void setDimensionCount(int dimensionCount) { + private void setDimensionCount(int dimensionCount) { this.dimensionCount = dimensionCount; } @@ -417,7 +402,7 @@ public class CarbonFactDataHandlerModel { return complexIndexMap; } - public void setComplexIndexMap(Map<Integer, GenericDataType> complexIndexMap) { + private void setComplexIndexMap(Map<Integer, GenericDataType> complexIndexMap) { this.complexIndexMap = complexIndexMap; } @@ -425,7 +410,7 @@ public class CarbonFactDataHandlerModel { return primitiveDimLens; } - public void setPrimitiveDimLens(int[] primitiveDimLens) { + private void setPrimitiveDimLens(int[] primitiveDimLens) { this.primitiveDimLens = primitiveDimLens; } @@ -433,7 +418,7 @@ public class CarbonFactDataHandlerModel { return measureDataType; } - public void setMeasureDataType(DataType[] measureDataType) { + private void setMeasureDataType(DataType[] measureDataType) { this.measureDataType = measureDataType; } @@ -441,7 +426,7 @@ public class CarbonFactDataHandlerModel { return carbonDataDirectoryPath; } - public void setCarbonDataDirectoryPath(String carbonDataDirectoryPath) { + private void setCarbonDataDirectoryPath(String carbonDataDirectoryPath) { this.carbonDataDirectoryPath = carbonDataDirectoryPath; } @@ -461,13 +446,6 @@ public class CarbonFactDataHandlerModel { isCompactionFlow = compactionFlow; } - public boolean[] getIsUseInvertedIndex() { - return isUseInvertedIndex; - } - - public void setIsUseInvertedIndex(boolean[] isUseInvertedIndex) { - this.isUseInvertedIndex = isUseInvertedIndex; - } /** * * @return segmentProperties @@ -494,7 +472,7 @@ public class CarbonFactDataHandlerModel { /** * @param wrapperColumnSchema */ - public void setWrapperColumnSchema(List<ColumnSchema> wrapperColumnSchema) { + private void setWrapperColumnSchema(List<ColumnSchema> wrapperColumnSchema) { this.wrapperColumnSchema = wrapperColumnSchema; } @@ -508,7 +486,7 @@ public class CarbonFactDataHandlerModel { return schemaUpdatedTimeStamp; } - public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) { + private void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) { this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp; } @@ -528,7 +506,7 @@ public class CarbonFactDataHandlerModel { return complexDimensionKeyGenerator; } - public void setComplexDimensionKeyGenerator(KeyGenerator[] complexDimensionKeyGenerator) { + private void setComplexDimensionKeyGenerator(KeyGenerator[] complexDimensionKeyGenerator) { this.complexDimensionKeyGenerator = complexDimensionKeyGenerator; } @@ -554,10 +532,6 @@ public class CarbonFactDataHandlerModel { return count; } - public boolean isSortColumn(int columnIndex) { - return columnIndex < segmentProperties.getNumberOfSortColumns(); - } - public TableSpec getTableSpec() { return tableSpec; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/35564379/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java index ec91472..4abdf3c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java @@ -24,7 +24,7 @@ import java.util.List; import java.util.Map; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.scan.model.CarbonQueryPlan; +import org.apache.carbondata.core.scan.model.QueryProjection; import org.apache.carbondata.processing.partition.Partition; import org.apache.carbondata.processing.partition.impl.DefaultLoadBalancer; import org.apache.carbondata.processing.partition.impl.PartitionMultiFileImpl; @@ -46,7 +46,7 @@ public class CarbonQueryUtil { * It creates the one split for each region server. */ public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName, - CarbonQueryPlan queryPlan) { + QueryProjection queryPlan) { //Just create splits depends on locations of region servers List<Partition> allPartitions = null; @@ -55,7 +55,7 @@ public class CarbonQueryUtil { QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName); } else { allPartitions = - QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan); + QueryPartitionHelper.getInstance().getPartitionsForQuery(databaseName, tableName); } TableSplit[] splits = new TableSplit[allPartitions.size()]; for (int i = 0; i < splits.length; i++) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/35564379/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index 36a5a15..197cb14 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -150,7 +150,7 @@ class StreamHandoffRDD[K, V]( CarbonTableInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo) val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId) val format = new CarbonTableInputFormat[Array[Object]]() - val model = format.getQueryModel(inputSplit, attemptContext) + val model = format.createQueryModel(inputSplit, attemptContext) val inputFormat = new CarbonStreamInputFormat val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext) .asInstanceOf[CarbonStreamRecordReader]
