http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java index f4450e3..5f8d199 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java @@ -31,7 +31,7 @@ import java.util.Map; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.block.TableBlockInfo; -import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk; +import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionColumnPage; import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel; import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -45,7 +45,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.scan.model.QueryDimension; +import org.apache.carbondata.core.scan.model.ProjectionDimension; import mockit.Mock; import mockit.MockUp; @@ -267,8 +267,8 @@ public class CarbonUtilTest { @Test public void testToGetNextLesserValue() { byte[] dataChunks = { 5, 6, 7, 8, 9 }; byte[] compareValues = { 7 }; - FixedLengthDimensionDataChunk fixedLengthDataChunk = - new FixedLengthDimensionDataChunk(dataChunks, null, null, 5, 1); + FixedLengthDimensionColumnPage fixedLengthDataChunk = + new FixedLengthDimensionColumnPage(dataChunks, null, null, 5, 1); int result = CarbonUtil.nextLesserValueToTarget(2, fixedLengthDataChunk, compareValues); assertEquals(result, 1); } @@ -276,8 +276,8 @@ public class CarbonUtilTest { @Test public void testToGetNextLesserValueToTarget() { byte[] dataChunks = { 7, 7, 7, 8, 9 }; byte[] compareValues = { 7 }; - FixedLengthDimensionDataChunk fixedLengthDataChunk = - new FixedLengthDimensionDataChunk(dataChunks, null, null, 5, 1); + FixedLengthDimensionColumnPage fixedLengthDataChunk = + new FixedLengthDimensionColumnPage(dataChunks, null, null, 5, 1); int result = CarbonUtil.nextLesserValueToTarget(2, fixedLengthDataChunk, compareValues); assertEquals(result, -1); } @@ -285,8 +285,8 @@ public class CarbonUtilTest { @Test public void testToGetnextGreaterValue() { byte[] dataChunks = { 5, 6, 7, 8, 9 }; byte[] compareValues = { 7 }; - FixedLengthDimensionDataChunk fixedLengthDataChunk = - new FixedLengthDimensionDataChunk(dataChunks, null, null, 5, 1); + FixedLengthDimensionColumnPage fixedLengthDataChunk = + new FixedLengthDimensionColumnPage(dataChunks, null, null, 5, 1); int result = CarbonUtil.nextGreaterValueToTarget(2, fixedLengthDataChunk, compareValues, 5); assertEquals(result, 3); } @@ -302,8 +302,8 @@ public class CarbonUtilTest { @Test public void testToGetnextGreaterValueToTarget() { byte[] dataChunks = { 5, 6, 7, 7, 7 }; byte[] compareValues = { 7 }; - FixedLengthDimensionDataChunk fixedLengthDataChunk = - new FixedLengthDimensionDataChunk(dataChunks, null, null, 5, 1); + FixedLengthDimensionColumnPage fixedLengthDataChunk = + new FixedLengthDimensionColumnPage(dataChunks, null, null, 5, 1); int result = CarbonUtil.nextGreaterValueToTarget(2, fixedLengthDataChunk, compareValues, 5); assertEquals(result, 5); } @@ -525,23 +525,23 @@ public class CarbonUtilTest { } @Test public void testToGetDictionaryEncodingArray() { - QueryDimension column1 = new QueryDimension("Column1"); - QueryDimension column2 = new QueryDimension("Column2"); ColumnSchema column1Schema = new ColumnSchema(); ColumnSchema column2Schema = new ColumnSchema(); column1Schema.setColumnName("Column1"); List<Encoding> encoding = new ArrayList<>(); encoding.add(Encoding.DICTIONARY); column1Schema.setEncodingList(encoding); - column1.setDimension(new CarbonDimension(column1Schema, 1, 1, 1, 1)); + ProjectionDimension + column1 = new ProjectionDimension(new CarbonDimension(column1Schema, 1, 1, 1, 1)); column2Schema.setColumnName("Column2"); List<Encoding> encoding2 = new ArrayList<>(); encoding2.add(Encoding.DELTA); column2Schema.setEncodingList(encoding2); - column2.setDimension(new CarbonDimension(column2Schema, 1, 1, 1, 1)); + ProjectionDimension + column2 = new ProjectionDimension(new CarbonDimension(column2Schema, 1, 1, 1, 1)); - QueryDimension[] queryDimensions = { column1, column2 }; + ProjectionDimension[] queryDimensions = { column1, column2 }; boolean[] dictionaryEncoding = CarbonUtil.getDictionaryEncodingArray(queryDimensions); boolean[] expectedDictionaryEncoding = { true, false }; @@ -551,23 +551,23 @@ public class CarbonUtilTest { } @Test public void testToGetDirectDictionaryEncodingArray() { - QueryDimension column1 = new QueryDimension("Column1"); - QueryDimension column2 = new QueryDimension("Column2"); ColumnSchema column1Schema = new ColumnSchema(); ColumnSchema column2Schema = new ColumnSchema(); column1Schema.setColumnName("Column1"); List<Encoding> encoding = new ArrayList<>(); encoding.add(Encoding.DIRECT_DICTIONARY); column1Schema.setEncodingList(encoding); - column1.setDimension(new CarbonDimension(column1Schema, 1, 1, 1, 1)); + ProjectionDimension + column1 = new ProjectionDimension(new CarbonDimension(column1Schema, 1, 1, 1, 1)); column2Schema.setColumnName("Column2"); List<Encoding> encoding2 = new ArrayList<>(); encoding2.add(Encoding.DELTA); column2Schema.setEncodingList(encoding2); - column2.setDimension(new CarbonDimension(column2Schema, 1, 1, 1, 1)); + ProjectionDimension + column2 = new ProjectionDimension(new CarbonDimension(column2Schema, 1, 1, 1, 1)); - QueryDimension[] queryDimensions = { column1, column2 }; + ProjectionDimension[] queryDimensions = { column1, column2 }; boolean[] dictionaryEncoding = CarbonUtil.getDirectDictionaryEncodingArray(queryDimensions); boolean[] expectedDictionaryEncoding = { true, false }; @@ -577,19 +577,19 @@ public class CarbonUtilTest { } @Test public void testToGetComplexDataTypeArray() { - QueryDimension column1 = new QueryDimension("Column1"); - QueryDimension column2 = new QueryDimension("Column2"); ColumnSchema column1Schema = new ColumnSchema(); ColumnSchema column2Schema = new ColumnSchema(); column1Schema.setColumnName("Column1"); column1Schema.setDataType(DataTypes.DATE); - column1.setDimension(new CarbonDimension(column1Schema, 1, 1, 1, 1)); + ProjectionDimension + column1 = new ProjectionDimension(new CarbonDimension(column1Schema, 1, 1, 1, 1)); column2Schema.setColumnName("Column2"); column2Schema.setDataType(DataTypes.createDefaultArrayType()); - column2.setDimension(new CarbonDimension(column2Schema, 1, 1, 1, 1)); + ProjectionDimension + column2 = new ProjectionDimension(new CarbonDimension(column2Schema, 1, 1, 1, 1)); - QueryDimension[] queryDimensions = { column1, column2 }; + ProjectionDimension[] queryDimensions = { column1, column2 }; boolean[] dictionaryEncoding = CarbonUtil.getComplexDataTypeArray(queryDimensions); boolean[] expectedDictionaryEncoding = { false, true }; @@ -806,8 +806,8 @@ public class CarbonUtilTest { @Test public void testToGetFirstIndexUsingBinarySearchWithCompareTo1() { byte[] dataChunks = { 10, 20, 30, 40, 50, 60 }; byte[] compareValue = { 5 }; - FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk = - new FixedLengthDimensionDataChunk(dataChunks, null, null, 6, 1); + FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk = + new FixedLengthDimensionColumnPage(dataChunks, null, null, 6, 1); int result = CarbonUtil .getFirstIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 1, 3, compareValue, false); assertEquals(-2, result); @@ -816,8 +816,8 @@ public class CarbonUtilTest { @Test public void testToGetFirstIndexUsingBinarySearchWithCompareToLessThan0() { byte[] dataChunks = { 10, 20, 30, 40, 50, 60 }; byte[] compareValue = { 30 }; - FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk = - new FixedLengthDimensionDataChunk(dataChunks, null, null, 6, 1); + FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk = + new FixedLengthDimensionColumnPage(dataChunks, null, null, 6, 1); int result = CarbonUtil .getFirstIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 1, 3, compareValue, false); assertEquals(2, result); @@ -826,8 +826,8 @@ public class CarbonUtilTest { @Test public void testToGetFirstIndexUsingBinarySearchWithCompareTo0() { byte[] dataChunks = { 10, 10, 10, 40, 50, 60 }; byte[] compareValue = { 10 }; - FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk = - new FixedLengthDimensionDataChunk(dataChunks, null, null, 6, 1); + FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk = + new FixedLengthDimensionColumnPage(dataChunks, null, null, 6, 1); int result = CarbonUtil .getFirstIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 1, 3, compareValue, false); assertEquals(0, result); @@ -836,8 +836,8 @@ public class CarbonUtilTest { @Test public void testToGetFirstIndexUsingBinarySearchWithMatchUpLimitTrue() { byte[] dataChunks = { 10, 10, 10, 40, 50, 60 }; byte[] compareValue = { 10 }; - FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk = - new FixedLengthDimensionDataChunk(dataChunks, null, null, 6, 1); + FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk = + new FixedLengthDimensionColumnPage(dataChunks, null, null, 6, 1); int result = CarbonUtil .getFirstIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 1, 3, compareValue, true); assertEquals(2, result); @@ -847,13 +847,13 @@ public class CarbonUtilTest { public void testBinaryRangeSearch() { byte[] dataChunk = new byte[10]; - FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk; + FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk; byte[] keyWord = new byte[1]; int[] range; dataChunk = "abbcccddddeffgggh".getBytes(); byte[][] dataArr = new byte[dataChunk.length / keyWord.length][keyWord.length]; - fixedLengthDimensionDataChunk = new FixedLengthDimensionDataChunk(dataChunk, null, null, + fixedLengthDimensionDataChunk = new FixedLengthDimensionColumnPage(dataChunk, null, null, dataChunk.length / keyWord.length, keyWord.length); for (int ii = 0; ii < dataChunk.length / keyWord.length; ii++) { @@ -885,7 +885,7 @@ public class CarbonUtilTest { assertRangeIndex(dataArr, dataChunk, fixedLengthDimensionDataChunk, keyWord, expectRangeIndex); dataChunk = "ab".getBytes(); - fixedLengthDimensionDataChunk = new FixedLengthDimensionDataChunk(dataChunk, null, null, + fixedLengthDimensionDataChunk = new FixedLengthDimensionColumnPage(dataChunk, null, null, dataChunk.length / keyWord.length, keyWord.length); keyWord[0] = Byte.valueOf("97"); @@ -899,7 +899,7 @@ public class CarbonUtilTest { assertEquals(1, range[1]); dataChunk = "aabb".getBytes(); - fixedLengthDimensionDataChunk = new FixedLengthDimensionDataChunk(dataChunk, null, null, + fixedLengthDimensionDataChunk = new FixedLengthDimensionColumnPage(dataChunk, null, null, dataChunk.length / keyWord.length, keyWord.length); keyWord[0] = Byte.valueOf("97"); @@ -913,7 +913,7 @@ public class CarbonUtilTest { assertEquals(3, range[1]); dataChunk = "a".getBytes(); - fixedLengthDimensionDataChunk = new FixedLengthDimensionDataChunk(dataChunk, null, null, + fixedLengthDimensionDataChunk = new FixedLengthDimensionColumnPage(dataChunk, null, null, dataChunk.length / keyWord.length, keyWord.length); keyWord[0] = Byte.valueOf("97"); @@ -922,7 +922,7 @@ public class CarbonUtilTest { assertEquals(0, range[1]); dataChunk = "aa".getBytes(); - fixedLengthDimensionDataChunk = new FixedLengthDimensionDataChunk(dataChunk, null, null, + fixedLengthDimensionDataChunk = new FixedLengthDimensionColumnPage(dataChunk, null, null, dataChunk.length / keyWord.length, keyWord.length); keyWord[0] = Byte.valueOf("97"); @@ -931,7 +931,7 @@ public class CarbonUtilTest { assertEquals(1, range[1]); dataChunk = "aabbbbbbbbbbcc".getBytes(); - fixedLengthDimensionDataChunk = new FixedLengthDimensionDataChunk(dataChunk, null, null, + fixedLengthDimensionDataChunk = new FixedLengthDimensionColumnPage(dataChunk, null, null, dataChunk.length / keyWord.length, keyWord.length); keyWord[0] = Byte.valueOf("98"); range = CarbonUtil.getRangeIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 0, dataChunk.length - 1, keyWord); @@ -944,14 +944,14 @@ public class CarbonUtilTest { public void IndexUsingBinarySearchLengthTwo() { byte[] dataChunk = new byte[10]; - FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk; + FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk; byte[] keyWord = new byte[2]; dataChunk = "aabbbbbbbbbbcc".getBytes(); byte[][] dataArr = new byte[dataChunk.length / keyWord.length][keyWord.length]; - fixedLengthDimensionDataChunk = new FixedLengthDimensionDataChunk(dataChunk, null, null, + fixedLengthDimensionDataChunk = new FixedLengthDimensionColumnPage(dataChunk, null, null, dataChunk.length / keyWord.length, keyWord.length); for (int ii = 0; ii < dataChunk.length / keyWord.length; ii++) { @@ -986,14 +986,14 @@ public class CarbonUtilTest { public void IndexUsingBinarySearchLengthThree() { byte[] dataChunk = new byte[10]; - FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk; + FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk; byte[] keyWord = new byte[3]; dataChunk = "aaabbbbbbbbbccc".getBytes(); byte[][] dataArr = new byte[dataChunk.length / keyWord.length][keyWord.length]; - fixedLengthDimensionDataChunk = new FixedLengthDimensionDataChunk(dataChunk, null, null, + fixedLengthDimensionDataChunk = new FixedLengthDimensionColumnPage(dataChunk, null, null, dataChunk.length / keyWord.length, keyWord.length); for (int ii = 0; ii < dataChunk.length / keyWord.length; ii++) { @@ -1101,7 +1101,7 @@ public class CarbonUtilTest { } private void assertRangeIndex(byte[][] dataArr, byte[] dataChunk, - FixedLengthDimensionDataChunk fixedLengthDimensionDataChunk, byte[] keyWord, int[] expectRangeIndex) { + FixedLengthDimensionColumnPage fixedLengthDimensionDataChunk, byte[] keyWord, int[] expectRangeIndex) { int[] range; range = CarbonUtil.getRangeIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 0, (dataChunk.length - 1) / keyWord.length, keyWord);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java index de64c0a..e506994 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java @@ -25,10 +25,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.carbondata.core.datastore.FileHolder; +import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.datastore.impl.FileHolderImpl; +import org.apache.carbondata.core.datastore.impl.FileReaderImpl; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.metadata.blocklet.SegmentInfo; @@ -229,13 +229,13 @@ public class DataFileFooterConverterTest { } @SuppressWarnings("unused") @Mock - public FileHolder getFileHolder(FileFactory.FileType fileType) { - return new FileHolderImpl(); + public FileReader getFileHolder(FileFactory.FileType fileType) { + return new FileReaderImpl(); } }; - new MockUp<FileHolderImpl>() { + new MockUp<FileReaderImpl>() { @SuppressWarnings("unused") @Mock public long readLong(String filePath, long offset) { return 1; } @@ -249,7 +249,6 @@ public class DataFileFooterConverterTest { SegmentInfo segmentInfo = new SegmentInfo(); int[] arr = { 1, 2, 3 }; segmentInfo.setColumnCardinality(arr); - segmentInfo.setNumberOfColumns(segmentInfo1.getNum_cols()); dataFileFooter.setNumberOfRows(3); dataFileFooter.setSegmentInfo(segmentInfo); TableBlockInfo info = new TableBlockInfo("/file.carbondata", 1, "0", new String[0], 1, ColumnarFormatVersion.V1, null); http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java b/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java index 4c9a784..4fb5dcc 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/RangeFilterProcessorTest.java @@ -36,7 +36,6 @@ import org.apache.carbondata.core.scan.expression.logical.RangeExpression; import org.apache.carbondata.core.scan.expression.logical.TrueExpression; import org.apache.carbondata.core.scan.filter.executer.RangeValueFilterExecuterImpl; import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer; -import org.apache.carbondata.core.scan.filter.intf.FilterOptimizerBasic; import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer; import mockit.Deencapsulation; @@ -102,7 +101,7 @@ public class RangeFilterProcessorTest { new LessThanEqualToExpression(new ColumnExpression("a", DataTypes.STRING), new LiteralExpression("20", DataTypes.STRING))), new TrueExpression(null)); FilterOptimizer rangeFilterOptimizer = - new RangeFilterOptmizer(new FilterOptimizerBasic(), inputFilter); + new RangeFilterOptmizer(inputFilter); rangeFilterOptimizer.optimizeFilter(); result = checkBothTrees(inputFilter, output); Assert.assertTrue(result); @@ -143,7 +142,7 @@ public class RangeFilterProcessorTest { new LessThanEqualToExpression(new ColumnExpression("a", DataTypes.STRING), new LiteralExpression("05", DataTypes.STRING))); FilterOptimizer rangeFilterOptimizer = - new RangeFilterOptmizer(new FilterOptimizerBasic(), inputFilter); + new RangeFilterOptmizer(inputFilter); rangeFilterOptimizer.optimizeFilter(); result = checkBothTrees(inputFilter, output); // no change @@ -218,7 +217,7 @@ public class RangeFilterProcessorTest { Expression Andb3 = new AndExpression(Andb2, new TrueExpression(null)); FilterOptimizer rangeFilterOptimizer = - new RangeFilterOptmizer(new FilterOptimizerBasic(), inputFilter); + new RangeFilterOptmizer(inputFilter); rangeFilterOptimizer.optimizeFilter(); result = checkBothTrees(inputFilter, new AndExpression(Andb3, new TrueExpression(null))); // no change @@ -302,7 +301,7 @@ public class RangeFilterProcessorTest { Expression Orb3 = new OrExpression(Orb2, lessThanb2); FilterOptimizer rangeFilterOptimizer = - new RangeFilterOptmizer(new FilterOptimizerBasic(), inputFilter); + new RangeFilterOptmizer(inputFilter); rangeFilterOptimizer.optimizeFilter(); result = checkBothTrees(inputFilter, new OrExpression(Orb3, lessThanb1)); // no change http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java b/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java deleted file mode 100644 index 94c3f68..0000000 --- a/core/src/test/java/org/apache/carbondata/scanner/impl/FilterScannerTest.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.scanner.impl; - -import static junit.framework.TestCase.assertEquals; - -public class FilterScannerTest { -// -// private static FilterScanner filterScanner; -// private static BlockletIndex blockletIndex; -// private static BlockletMinMaxIndex blockletMinMaxIndex; -// private static BTreeBuilderInfo bTreeBuilderInfo; -// private static DataFileFooter dataFileFooter; -// -// @BeforeClass public static void setUp() { -// BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo(); -// FilterExecuter filterExecutor = new AndFilterExecuterImpl(null, null); -// blockExecutionInfo.setFilterExecuterTree(filterExecutor); -// blockExecutionInfo.setFixedLengthKeySize(1); -// blockExecutionInfo.setNoDictionaryBlockIndexes(new int[] { 1, 2 }); -// blockExecutionInfo.setDictionaryColumnBlockIndex(new int[] { 1 }); -// blockExecutionInfo.setColumnGroupToKeyStructureInfo(new HashMap<Integer, KeyStructureInfo>()); -// blockExecutionInfo.setComplexDimensionInfoMap(new HashMap<Integer, GenericQueryType>()); -// blockExecutionInfo.setComplexColumnParentBlockIndexes(new int[] { 1 }); -// blockExecutionInfo.setQueryDimensions(new QueryDimension[] { new QueryDimension("Col1") }); -// blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(new int[][] { { 0, 0 } }); -// blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(new int[][] { { 0, 0 } }); -// blockExecutionInfo.setTotalNumberOfMeasureBlock(1); -// blockExecutionInfo.setTotalNumberDimensionBlock(1); -// QueryStatisticsModel queryStatisticsModel = new QueryStatisticsModel(); -// QueryStatistic queryStatistic = new QueryStatistic(); -// queryStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, 1); -// Map<String, QueryStatistic> statisticsTypeAndObjMap = new HashMap<>(); -// statisticsTypeAndObjMap.put(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, queryStatistic); -// statisticsTypeAndObjMap.put(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, queryStatistic); -// queryStatisticsModel.setStatisticsTypeAndObjMap(statisticsTypeAndObjMap); -// QueryStatisticsRecorder queryStatisticsRecorder = new QueryStatisticsRecorderImpl("1"); -// queryStatisticsModel.setRecorder(queryStatisticsRecorder); -// filterScanner = new FilterScanner(blockExecutionInfo, queryStatisticsModel); -// blockletIndex = new BlockletIndex(); -// blockletMinMaxIndex = new BlockletMinMaxIndex(); -// blockletMinMaxIndex.setMinValues(new byte[][] { { 1, 2 } }); -// blockletMinMaxIndex.setMaxValues(new byte[][] { { 10, 12 } }); -// blockletIndex.setMinMaxIndex(blockletMinMaxIndex); -// dataFileFooter = new DataFileFooter(); -// dataFileFooter.setBlockletIndex(blockletIndex); -// bTreeBuilderInfo = new BTreeBuilderInfo(Arrays.asList(dataFileFooter), new int[] { 1 }); -// } -// -// @Test public void testToScanBlockletWithEmptyBitSet() throws QueryExecutionException { -// new MockUp<AndFilterExecuterImpl>() { -// @SuppressWarnings("unused") @Mock -// public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) { -// return new BitSet(); -// } -// }; -// BlocksChunkHolder blocksChunkHolder = new BlocksChunkHolder(1, 1); -// DataRefNode dataRefNode = new BlockBTreeLeafNode(bTreeBuilderInfo, 0, 1); -// blocksChunkHolder.setDataBlock(dataRefNode); -// AbstractScannedResult abstractScannedResult = filterScanner.scanBlocklet(blocksChunkHolder); -// assertEquals(0, abstractScannedResult.numberOfOutputRows()); -// } -// -// @Test public void testToScanBlockletWithNonEmptyBitSet() throws QueryExecutionException { -// new MockUp<AndFilterExecuterImpl>() { -// @SuppressWarnings("unused") @Mock -// public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) { -// BitSet bitSet = new BitSet(); -// bitSet.set(1); -// bitSet.set(2); -// bitSet.set(1); -// return bitSet; -// } -// -// @SuppressWarnings("unused") @Mock -// public BitSet applyFilter(BlocksChunkHolder blockChunkHolder) -// throws FilterUnsupportedException { -// BitSet bitSet = new BitSet(); -// bitSet.set(1); -// bitSet.set(2); -// bitSet.set(1); -// return bitSet; -// } -// }; -// DataRefNode dataRefNode = new MockUp<DataRefNode>() { -// @Mock @SuppressWarnings("unused") DimensionColumnDataChunk[] getDimensionChunks( -// FileHolder fileReader, int[][] blockIndexes) { -// DimensionColumnDataChunk[] dimensionChunkAttributes = -// { new ColumnGroupDimensionDataChunk(null, null) }; -// return dimensionChunkAttributes; -// } -// -// @Mock @SuppressWarnings("unused") ColumnPage[] getMeasureChunks( -// FileHolder fileReader, int[][] blockIndexes) { -// -// ColumnPage[] ColumnPages = { new ColumnPage() }; -// return ColumnPages; -// } -// }.getMockInstance(); -// -// BlocksChunkHolder blocksChunkHolder = new BlocksChunkHolder(1, 1); -// blocksChunkHolder.setDataBlock(dataRefNode); -// DimensionChunkAttributes dimensionChunkAttributes = new DimensionChunkAttributes(); -// DimensionColumnDataChunk dimensionColumnDataChunk = -// new FixedLengthDimensionDataChunk(new byte[] { 0, 1 }, dimensionChunkAttributes); -// blocksChunkHolder.setDimensionRawDataChunk(new DimensionColumnDataChunk[] -// -// { dimensionColumnDataChunk }); -// ColumnPage ColumnPage = new ColumnPage(); -// blocksChunkHolder.setMeasureDataChunk(new ColumnPage[] -// -// { ColumnPage }); -// FileHolder fileHolder = new DFSFileHolderImpl(); -// blocksChunkHolder.setFileReader(fileHolder); -// AbstractScannedResult abstractScannedResult = filterScanner.scanBlocklet(blocksChunkHolder); -// -// assertEquals(2, abstractScannedResult.numberOfOutputRows()); -// } -// -// @Test(expected = QueryExecutionException.class) public void testToScanBlockletWithException() -// throws QueryExecutionException { -// new MockUp<AndFilterExecuterImpl>() { -// @SuppressWarnings("unused") @Mock -// public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) { -// BitSet bitSet = new BitSet(); -// bitSet.set(1); -// bitSet.set(2); -// bitSet.set(1); -// return bitSet; -// } -// -// @SuppressWarnings("unused") @Mock -// public BitSet applyFilter(BlocksChunkHolder blockChunkHolder) -// throws FilterUnsupportedException { -// throw new FilterUnsupportedException("Filter unsupported"); -// } -// }; -// BlocksChunkHolder blocksChunkHolder = new BlocksChunkHolder(1, 1); -// BTreeBuilderInfo bTreeBuilderInfo = -// new BTreeBuilderInfo(Arrays.asList(dataFileFooter), new int[] { 1 }); -// DataRefNode dataRefNode = new BlockBTreeLeafNode(bTreeBuilderInfo, 0, 1); -// blocksChunkHolder.setDataBlock(dataRefNode); -// filterScanner.scanBlocklet(blocksChunkHolder); -// } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/dev/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/dev/findbugs-exclude.xml b/dev/findbugs-exclude.xml index 1520cd4..b19db85 100644 --- a/dev/findbugs-exclude.xml +++ b/dev/findbugs-exclude.xml @@ -31,7 +31,7 @@ <Bug pattern="OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE"/> </Match> <Match> - <Class name="org.apache.carbondata.core.datastore.impl.FileHolderImpl"/> + <Class name="org.apache.carbondata.core.datastore.impl.FileReaderImpl"/> <Method name="getDataInputStream"/> <Bug pattern="OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE"/> </Match> http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala index 76afcbf..7a15327 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala @@ -62,7 +62,7 @@ object CarbonSessionExample { spark.sql( s""" - | SELECT * + | SELECT charField, stringField, intField | FROM carbon_table | WHERE stringfield = 'spark' AND decimalField > 40 """.stripMargin).show() http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 069e1f7..087cf55 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -58,7 +58,6 @@ import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; import org.apache.carbondata.core.scan.filter.SingleTableProvider; import org.apache.carbondata.core.scan.filter.TableProvider; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; -import org.apache.carbondata.core.scan.model.CarbonQueryPlan; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.stats.QueryStatistic; import org.apache.carbondata.core.stats.QueryStatisticsConstants; @@ -111,11 +110,11 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { // comma separated list of input segment numbers public static final String INPUT_SEGMENT_NUMBERS = "mapreduce.input.carboninputformat.segmentnumbers"; - public static final String VALIDATE_INPUT_SEGMENT_IDs = + private static final String VALIDATE_INPUT_SEGMENT_IDs = "mapreduce.input.carboninputformat.validsegments"; // comma separated list of input files public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files"; - public static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid"; + private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid"; private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class); private static final String FILTER_PREDICATE = "mapreduce.input.carboninputformat.filter.predicate"; @@ -126,7 +125,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr"; public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName"; public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName"; - public static final String PARTITIONS_TO_PRUNE = + private static final String PARTITIONS_TO_PRUNE = "mapreduce.input.carboninputformat.partitions.to.prune"; public static final String UPADTE_T = "mapreduce.input.carboninputformat.partitions.to.prune"; @@ -339,7 +338,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { * @return List<InputSplit> list of CarbonInputSplit * @throws IOException */ - @Override public List<InputSplit> getSplits(JobContext job) throws IOException { + @Override + public List<InputSplit> getSplits(JobContext job) throws IOException { AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier); CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); @@ -808,28 +808,29 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { return split; } - @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, + @Override + public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { Configuration configuration = taskAttemptContext.getConfiguration(); - QueryModel queryModel = getQueryModel(inputSplit, taskAttemptContext); + QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext); CarbonReadSupport<T> readSupport = getReadSupportClass(configuration); return new CarbonRecordReader<T>(queryModel, readSupport); } - public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException { Configuration configuration = taskAttemptContext.getConfiguration(); CarbonTable carbonTable = getOrCreateCarbonTable(configuration); TableProvider tableProvider = new SingleTableProvider(carbonTable); - // getting the table absoluteTableIdentifier from the carbonTable - // to avoid unnecessary deserialization - AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); // query plan includes projection column - String projection = getColumnProjection(configuration); - CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection); - QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable, - getDataTypeConverter(configuration)); + String projectionString = getColumnProjection(configuration); + String[] projectionColumnNames = null; + if (projectionString != null) { + projectionColumnNames = projectionString.split(","); + } + QueryModel queryModel = carbonTable.createQueryWithProjection( + projectionColumnNames, getDataTypeConverter(configuration)); // set the filter to the query model in order to filter blocklet before scan Expression filter = getFilterPredicates(configuration); @@ -884,7 +885,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { return readSupport; } - @Override protected boolean isSplitable(JobContext context, Path filename) { + @Override + protected boolean isSplitable(JobContext context, Path filename) { try { // Don't split the file if it is local file system FileSystem fileSystem = filename.getFileSystem(context.getConfiguration()); @@ -898,16 +900,6 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { } /** - * required to be moved to core - * - * @return updateExtension - */ - private String getUpdateExtension() { - // TODO: required to modify when supporting update, mostly will be update timestamp - return "update"; - } - - /** * return valid segment to access */ private Segment[] getSegmentsToAccess(JobContext job) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java index a590a5b..0fe0cbf 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java @@ -176,9 +176,7 @@ class InMemoryBTreeIndex implements Index { filterredBlocks = filterExpressionProcessor.getFilterredBlocks( abstractIndex.getDataRefNode(), resolver, - abstractIndex, - identifier - ); + abstractIndex); } resultFilterredBlocks.addAll(filterredBlocks); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java index 95a7af0..1e227c4 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java @@ -156,7 +156,7 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> { hadoopConf = context.getConfiguration(); if (model == null) { CarbonTableInputFormat format = new CarbonTableInputFormat<Object>(); - model = format.getQueryModel(split, context); + model = format.createQueryModel(split, context); } carbonTable = model.getTable(); List<CarbonDimension> dimensions = http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java index 89a4a9a..2f28861 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java @@ -67,7 +67,7 @@ public class BlockLevelTraverser { blockName = CarbonTablePath.getCarbonDataFileName(blockName); blockName = blockName + CarbonTablePath.getCarbonDataExtension(); - long rowCount = currentBlock.nodeSize(); + long rowCount = currentBlock.numRows(); String key = CarbonUpdateUtil.getSegmentBlockNameKey(segId, blockName); http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java index 056c27b..9f8c5ec 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java @@ -26,18 +26,12 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; import org.apache.carbondata.core.scan.filter.TableProvider; import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer; -import org.apache.carbondata.core.scan.filter.intf.FilterOptimizerBasic; import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; -import org.apache.carbondata.core.scan.model.CarbonQueryPlan; -import org.apache.carbondata.core.scan.model.QueryDimension; -import org.apache.carbondata.core.scan.model.QueryMeasure; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; @@ -52,45 +46,14 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; */ public class CarbonInputFormatUtil { - public static CarbonQueryPlan createQueryPlan(CarbonTable carbonTable, String columnString) { - String[] columns = null; - if (columnString != null) { - columns = columnString.split(","); - } - String factTableName = carbonTable.getTableName(); - CarbonQueryPlan plan = new CarbonQueryPlan(carbonTable.getDatabaseName(), factTableName); - // fill dimensions - // If columns are null, set all dimensions and measures - int i = 0; - if (columns != null) { - for (String column : columns) { - CarbonDimension dimensionByName = carbonTable.getDimensionByName(factTableName, column); - if (dimensionByName != null) { - addQueryDimension(plan, i, dimensionByName); - i++; - } else { - CarbonMeasure measure = carbonTable.getMeasureByName(factTableName, column); - if (measure == null) { - throw new RuntimeException(column + " column not found in the table " + factTableName); - } - addQueryMeasure(plan, i, measure); - i++; - } - } - } - - plan.setQueryId(System.nanoTime() + ""); - return plan; - } - public static <V> CarbonTableInputFormat<V> createCarbonInputFormat( AbsoluteTableIdentifier identifier, Job job) throws IOException { CarbonTableInputFormat<V> carbonInputFormat = new CarbonTableInputFormat<>(); - carbonInputFormat.setDatabaseName(job.getConfiguration(), - identifier.getCarbonTableIdentifier().getDatabaseName()); - carbonInputFormat - .setTableName(job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName()); + CarbonTableInputFormat.setDatabaseName( + job.getConfiguration(), identifier.getCarbonTableIdentifier().getDatabaseName()); + CarbonTableInputFormat.setTableName( + job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName()); FileInputFormat.addInputPath(job, new Path(identifier.getTablePath())); return carbonInputFormat; } @@ -98,30 +61,16 @@ public class CarbonInputFormatUtil { public static <V> CarbonTableInputFormat<V> createCarbonTableInputFormat( AbsoluteTableIdentifier identifier, List<String> partitionId, Job job) throws IOException { CarbonTableInputFormat<V> carbonTableInputFormat = new CarbonTableInputFormat<>(); - carbonTableInputFormat.setPartitionIdList(job.getConfiguration(), partitionId); - carbonTableInputFormat.setDatabaseName(job.getConfiguration(), - identifier.getCarbonTableIdentifier().getDatabaseName()); - carbonTableInputFormat - .setTableName(job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName()); + CarbonTableInputFormat.setPartitionIdList( + job.getConfiguration(), partitionId); + CarbonTableInputFormat.setDatabaseName( + job.getConfiguration(), identifier.getCarbonTableIdentifier().getDatabaseName()); + CarbonTableInputFormat.setTableName( + job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName()); FileInputFormat.addInputPath(job, new Path(identifier.getTablePath())); return carbonTableInputFormat; } - private static void addQueryMeasure(CarbonQueryPlan plan, int order, CarbonMeasure measure) { - QueryMeasure queryMeasure = new QueryMeasure(measure.getColName()); - queryMeasure.setQueryOrder(order); - queryMeasure.setMeasure(measure); - plan.addMeasure(queryMeasure); - } - - private static void addQueryDimension(CarbonQueryPlan plan, int order, - CarbonDimension dimension) { - QueryDimension queryDimension = new QueryDimension(dimension.getColName()); - queryDimension.setQueryOrder(order); - queryDimension.setDimension(dimension); - plan.addDimension(queryDimension); - } - public static void processFilterExpression(Expression filterExpression, CarbonTable carbonTable, boolean[] isFilterDimensions, boolean[] isFilterMeasures) { QueryModel.processFilterExpression(carbonTable, filterExpression, isFilterDimensions, @@ -130,7 +79,7 @@ public class CarbonInputFormatUtil { if (null != filterExpression) { // Optimize Filter Expression and fit RANGE filters is conditions apply. FilterOptimizer rangeFilterOptimizer = - new RangeFilterOptmizer(new FilterOptimizerBasic(), filterExpression); + new RangeFilterOptmizer(filterExpression); rangeFilterOptimizer.optimizeFilter(); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java index f109e1c..1b57f93 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java @@ -30,7 +30,6 @@ import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.filter.SingleTableProvider; import org.apache.carbondata.core.scan.filter.TableProvider; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; -import org.apache.carbondata.core.scan.model.CarbonQueryPlan; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.util.DataTypeConverterImpl; import org.apache.carbondata.hadoop.CarbonInputSplit; @@ -140,11 +139,11 @@ public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritabl AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); - String projection = getProjection(configuration, carbonTable, + String projectionString = getProjection(configuration, carbonTable, identifier.getCarbonTableIdentifier().getTableName()); - CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection); - QueryModel queryModel = - QueryModel.createModel(identifier, queryPlan, carbonTable, new DataTypeConverterImpl()); + String[] projectionColumns = projectionString.split(","); + QueryModel queryModel = carbonTable.createQueryWithProjection( + projectionColumns, new DataTypeConverterImpl()); // set the filter to the query model in order to filter blocklet before scan Expression filter = getFilterPredicates(configuration); CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null); http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java deleted file mode 100644 index 9a8f8c5..0000000 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.presto; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.apache.carbondata.core.cache.dictionary.Dictionary; -import org.apache.carbondata.core.datastore.block.TableBlockInfo; -import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; -import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.metadata.datatype.StructField; -import org.apache.carbondata.core.metadata.encoder.Encoding; -import org.apache.carbondata.core.scan.executor.QueryExecutor; -import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; -import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; -import org.apache.carbondata.core.scan.model.QueryDimension; -import org.apache.carbondata.core.scan.model.QueryMeasure; -import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator; -import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; -import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.hadoop.AbstractRecordReader; -import org.apache.carbondata.hadoop.CarbonInputSplit; -import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; - -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -/** - * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the - * carbondata column APIs and fills the data directly into columns. - */ -class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> { - - private int batchIdx = 0; - - private int numBatched = 0; - - private CarbonVectorBatch columnarBatch; - - private CarbonColumnarBatch carbonColumnarBatch; - - /** - * If true, this class returns batches instead of rows. - */ - private boolean returnColumnarBatch; - - private QueryModel queryModel; - - private AbstractDetailQueryResultIterator iterator; - - private QueryExecutor queryExecutor; - - public CarbonVectorizedRecordReader(QueryExecutor queryExecutor, QueryModel queryModel, AbstractDetailQueryResultIterator iterator) { - this.queryModel = queryModel; - this.iterator = iterator; - this.queryExecutor = queryExecutor; - enableReturningBatches(); - } - - /** - * Implementation of RecordReader API. - */ - @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException, UnsupportedOperationException { - // The input split can contain single HDFS block or multiple blocks, so firstly get all the - // blocks and then set them in the query model. - List<CarbonInputSplit> splitList; - if (inputSplit instanceof CarbonInputSplit) { - splitList = new ArrayList<>(1); - splitList.add((CarbonInputSplit) inputSplit); - } else if (inputSplit instanceof CarbonMultiBlockSplit) { - // contains multiple blocks, this is an optimization for concurrent query. - CarbonMultiBlockSplit multiBlockSplit = (CarbonMultiBlockSplit) inputSplit; - splitList = multiBlockSplit.getAllSplits(); - } else { - throw new RuntimeException("unsupported input split type: " + inputSplit); - } - List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList); - queryModel.setTableBlockInfos(tableBlockInfoList); - queryModel.setVectorReader(true); - try { - queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); - iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel); - } catch (QueryExecutionException e) { - throw new InterruptedException(e.getMessage()); - } - } - - @Override public void close() throws IOException { - logStatistics(rowCount, queryModel.getStatisticsRecorder()); - if (columnarBatch != null) { - columnarBatch = null; - } - // clear dictionary cache - Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping(); - if (null != columnToDictionaryMapping) { - for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) { - CarbonUtil.clearDictionaryCache(entry.getValue()); - } - } - try { - queryExecutor.finish(); - } catch (QueryExecutionException e) { - throw new IOException(e); - } - } - - @Override public boolean nextKeyValue() throws IOException, InterruptedException { - resultBatch(); - - if (returnColumnarBatch) return nextBatch(); - - if (batchIdx >= numBatched) { - if (!nextBatch()) return false; - } - ++batchIdx; - return true; - } - - @Override public Object getCurrentValue() throws IOException, InterruptedException { - if (returnColumnarBatch) { - rowCount += columnarBatch.numValidRows(); - return columnarBatch; - } else { - return null; - } - } - - @Override public Void getCurrentKey() throws IOException, InterruptedException { - return null; - } - - @Override public float getProgress() throws IOException, InterruptedException { - // TODO : Implement it based on total number of rows it is going to retrive. - return 0; - } - - /** - * Returns the ColumnarBatch object that will be used for all rows returned by this reader. - * This object is reused. Calling this enables the vectorized reader. This should be called - * before any calls to nextKeyValue/nextBatch. - */ - - private void initBatch() { - List<QueryDimension> queryDimension = queryModel.getQueryDimension(); - List<QueryMeasure> queryMeasures = queryModel.getQueryMeasures(); - StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()]; - for (int i = 0; i < queryDimension.size(); i++) { - QueryDimension dim = queryDimension.get(i); - if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) { - DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory - .getDirectDictionaryGenerator(dim.getDimension().getDataType()); - fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(), - generator.getReturnType()); - } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) { - fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(), - dim.getDimension().getDataType()); - } else if (dim.getDimension().isComplex()) { - fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(), - dim.getDimension().getDataType()); - } else { - fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(), - DataTypes.INT); - } - } - - for (QueryMeasure msr : queryMeasures) { - DataType dataType = msr.getMeasure().getDataType(); - if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT || dataType == DataTypes.INT - || dataType == DataTypes.LONG) { - fields[msr.getQueryOrder()] = - new StructField(msr.getColumnName(), msr.getMeasure().getDataType()); - } else if (DataTypes.isDecimal(dataType)) { - fields[msr.getQueryOrder()] = - new StructField(msr.getColumnName(), msr.getMeasure().getDataType()); - } else { - fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(), DataTypes.DOUBLE); - } - } - - columnarBatch = CarbonVectorBatch.allocate(fields); - CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length]; - boolean[] filteredRows = new boolean[columnarBatch.capacity()]; - for (int i = 0; i < fields.length; i++) { - vectors[i] = new CarbonColumnVectorWrapper(columnarBatch.column(i), filteredRows); - } - carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows); - } - - - private CarbonVectorBatch resultBatch() { - if (columnarBatch == null) initBatch(); - return columnarBatch; - } - - /* - * Can be called before any rows are returned to enable returning columnar batches directly. - */ - private void enableReturningBatches() { - returnColumnarBatch = true; - } - - /** - * Advances to the next batch of rows. Returns false if there are no more. - */ - private boolean nextBatch() { - columnarBatch.reset(); - carbonColumnarBatch.reset(); - if (iterator.hasNext()) { - iterator.processNextBatch(carbonColumnarBatch); - int actualSize = carbonColumnarBatch.getActualSize(); - columnarBatch.setNumRows(actualSize); - numBatched = actualSize; - batchIdx = 0; - return true; - } - return false; - } - - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java index 1679f29..5f1f90a 100644 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java @@ -54,7 +54,7 @@ class CarbondataPageSource implements ConnectorPageSource { private final List<Type> types; private final PageBuilder pageBuilder; private boolean closed; - private CarbonVectorizedRecordReader vectorReader; + private PrestoCarbonVectorizedRecordReader vectorReader; private CarbonDictionaryDecodeReadSupport<Object[]> readSupport; private long sizeOfData = 0; http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java index c614fa9..5772fbf 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java @@ -54,7 +54,7 @@ public class CarbondataRecordCursor implements RecordCursor { private CarbondataSplit split; private CarbonDictionaryDecodeReadSupport readSupport; private Tuple3<DataType, Dictionary, Int>[] dictionary; - CarbonVectorizedRecordReader vectorizedRecordReader; + PrestoCarbonVectorizedRecordReader vectorizedRecordReader; private long totalBytes; private long nanoStart; @@ -63,7 +63,7 @@ public class CarbondataRecordCursor implements RecordCursor { public CarbondataRecordCursor(CarbonDictionaryDecodeReadSupport readSupport, - CarbonVectorizedRecordReader vectorizedRecordReader, + PrestoCarbonVectorizedRecordReader vectorizedRecordReader, List<CarbondataColumnHandle> columnHandles, CarbondataSplit split) { this.vectorizedRecordReader = vectorizedRecordReader; @@ -194,7 +194,7 @@ public class CarbondataRecordCursor implements RecordCursor { //todo delete cache from readSupport } - public CarbonVectorizedRecordReader getVectorizedRecordReader() { + public PrestoCarbonVectorizedRecordReader getVectorizedRecordReader() { return vectorizedRecordReader; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java index 0f8fe87..286ff0e 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java @@ -78,8 +78,8 @@ public class CarbondataRecordSet implements RecordSet { readSupport .initialize(queryModel.getProjectionColumns(), queryModel.getTable()); CarbonIterator iterator = queryExecutor.execute(queryModel); - CarbonVectorizedRecordReader vectorReader = - new CarbonVectorizedRecordReader(queryExecutor, queryModel, + PrestoCarbonVectorizedRecordReader vectorReader = + new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel, (AbstractDetailQueryResultIterator) iterator); return new CarbondataRecordCursor(readSupport, vectorReader, columns, split); } catch (QueryExecutionException e) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java index f039daf..5a2f831 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java @@ -105,7 +105,7 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider { new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, TaskType.MAP, 0, 0)); CarbonInputSplit carbonInputSplit = CarbonLocalInputSplit.convertSplit(carbondataSplit.getLocalInputSplit()); - queryModel = carbonTableInputFormat.getQueryModel(carbonInputSplit, hadoopAttemptContext); + queryModel = carbonTableInputFormat.createQueryModel(carbonInputSplit, hadoopAttemptContext); queryModel.setVectorReader(true); } catch (IOException e) { throw new RuntimeException("Unable to get the Query Model ", e); http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java new file mode 100644 index 0000000..a1907db --- /dev/null +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoCarbonVectorizedRecordReader.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.scan.executor.QueryExecutor; +import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; +import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; +import org.apache.carbondata.core.scan.model.ProjectionDimension; +import org.apache.carbondata.core.scan.model.ProjectionMeasure; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.hadoop.AbstractRecordReader; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; + +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the + * carbondata column APIs and fills the data directly into columns. + */ +class PrestoCarbonVectorizedRecordReader extends AbstractRecordReader<Object> { + + private int batchIdx = 0; + + private int numBatched = 0; + + private CarbonVectorBatch columnarBatch; + + private CarbonColumnarBatch carbonColumnarBatch; + + /** + * If true, this class returns batches instead of rows. + */ + private boolean returnColumnarBatch; + + private QueryModel queryModel; + + private AbstractDetailQueryResultIterator iterator; + + private QueryExecutor queryExecutor; + + public PrestoCarbonVectorizedRecordReader(QueryExecutor queryExecutor, QueryModel queryModel, AbstractDetailQueryResultIterator iterator) { + this.queryModel = queryModel; + this.iterator = iterator; + this.queryExecutor = queryExecutor; + enableReturningBatches(); + } + + /** + * Implementation of RecordReader API. + */ + @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException, UnsupportedOperationException { + // The input split can contain single HDFS block or multiple blocks, so firstly get all the + // blocks and then set them in the query model. + List<CarbonInputSplit> splitList; + if (inputSplit instanceof CarbonInputSplit) { + splitList = new ArrayList<>(1); + splitList.add((CarbonInputSplit) inputSplit); + } else if (inputSplit instanceof CarbonMultiBlockSplit) { + // contains multiple blocks, this is an optimization for concurrent query. + CarbonMultiBlockSplit multiBlockSplit = (CarbonMultiBlockSplit) inputSplit; + splitList = multiBlockSplit.getAllSplits(); + } else { + throw new RuntimeException("unsupported input split type: " + inputSplit); + } + List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList); + queryModel.setTableBlockInfos(tableBlockInfoList); + queryModel.setVectorReader(true); + try { + queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); + iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel); + } catch (QueryExecutionException e) { + throw new InterruptedException(e.getMessage()); + } + } + + @Override public void close() throws IOException { + logStatistics(rowCount, queryModel.getStatisticsRecorder()); + if (columnarBatch != null) { + columnarBatch = null; + } + // clear dictionary cache + Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping(); + if (null != columnToDictionaryMapping) { + for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) { + CarbonUtil.clearDictionaryCache(entry.getValue()); + } + } + try { + queryExecutor.finish(); + } catch (QueryExecutionException e) { + throw new IOException(e); + } + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + resultBatch(); + + if (returnColumnarBatch) return nextBatch(); + + if (batchIdx >= numBatched) { + if (!nextBatch()) return false; + } + ++batchIdx; + return true; + } + + @Override public Object getCurrentValue() throws IOException, InterruptedException { + if (returnColumnarBatch) { + rowCount += columnarBatch.numValidRows(); + return columnarBatch; + } else { + return null; + } + } + + @Override public Void getCurrentKey() throws IOException, InterruptedException { + return null; + } + + @Override public float getProgress() throws IOException, InterruptedException { + // TODO : Implement it based on total number of rows it is going to retrive. + return 0; + } + + /** + * Returns the ColumnarBatch object that will be used for all rows returned by this reader. + * This object is reused. Calling this enables the vectorized reader. This should be called + * before any calls to nextKeyValue/nextBatch. + */ + + private void initBatch() { + List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions(); + List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures(); + StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()]; + for (int i = 0; i < queryDimension.size(); i++) { + ProjectionDimension dim = queryDimension.get(i); + if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) { + DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(dim.getDimension().getDataType()); + fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), + generator.getReturnType()); + } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) { + fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), + dim.getDimension().getDataType()); + } else if (dim.getDimension().isComplex()) { + fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), + dim.getDimension().getDataType()); + } else { + fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), + DataTypes.INT); + } + } + + for (ProjectionMeasure msr : queryMeasures) { + DataType dataType = msr.getMeasure().getDataType(); + if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT || dataType == DataTypes.INT + || dataType == DataTypes.LONG) { + fields[msr.getOrdinal()] = + new StructField(msr.getColumnName(), msr.getMeasure().getDataType()); + } else if (DataTypes.isDecimal(dataType)) { + fields[msr.getOrdinal()] = + new StructField(msr.getColumnName(), msr.getMeasure().getDataType()); + } else { + fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), DataTypes.DOUBLE); + } + } + + columnarBatch = CarbonVectorBatch.allocate(fields); + CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length]; + boolean[] filteredRows = new boolean[columnarBatch.capacity()]; + for (int i = 0; i < fields.length; i++) { + vectors[i] = new CarbonColumnVectorWrapper(columnarBatch.column(i), filteredRows); + } + carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows); + } + + + private CarbonVectorBatch resultBatch() { + if (columnarBatch == null) initBatch(); + return columnarBatch; + } + + /* + * Can be called before any rows are returned to enable returning columnar batches directly. + */ + private void enableReturningBatches() { + returnColumnarBatch = true; + } + + /** + * Advances to the next batch of rows. Returns false if there are no more. + */ + private boolean nextBatch() { + columnarBatch.reset(); + carbonColumnarBatch.reset(); + if (iterator.hasNext()) { + iterator.processNextBatch(carbonColumnarBatch); + int actualSize = carbonColumnarBatch.getActualSize(); + columnarBatch.setNumRows(actualSize); + numBatched = actualSize; + batchIdx = 0; + return true; + } + return false; + } + + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index ab3ab5d..3c70619 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -171,7 +171,8 @@ class CarbonMergerRDD[K, V]( LOGGER.info(s"Restructured block exists: $restructuredBlockExists") DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl) exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, - carbonTable, dataFileMetadataSegMapping, restructuredBlockExists) + carbonTable, dataFileMetadataSegMapping, restructuredBlockExists, + new SparkDataTypeConverterImpl) // fire a query and get the results. var result2: java.util.List[RawResultIterator] = null http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 772f702..97be1fb 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -336,7 +336,7 @@ class CarbonScanRDD( TaskMetricsMap.getInstance().registerThreadCallback() inputMetricsStats.initBytesReadCallback(context, inputSplit) val iterator = if (inputSplit.getAllSplits.size() > 0) { - val model = format.getQueryModel(inputSplit, attemptContext) + val model = format.createQueryModel(inputSplit, attemptContext) // get RecordReader by FileFormat val reader: RecordReader[Void, Object] = inputSplit.getFileFormat match { case FileFormat.ROW_V1 => http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java index 7d42130..432d50a 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java @@ -40,7 +40,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector { private DataType blockDataType; - public ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) { + ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) { this.columnVector = columnVector; this.filteredRows = filteredRows; this.dataType = CarbonScalaUtil.convertSparkToCarbonDataType(columnVector.dataType()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java index 5d927df..73da878 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java @@ -35,8 +35,8 @@ import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.scan.executor.QueryExecutor; import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; -import org.apache.carbondata.core.scan.model.QueryDimension; -import org.apache.carbondata.core.scan.model.QueryMeasure; +import org.apache.carbondata.core.scan.model.ProjectionDimension; +import org.apache.carbondata.core.scan.model.ProjectionMeasure; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; @@ -100,7 +100,8 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { /** * Implementation of RecordReader API. */ - @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException, UnsupportedOperationException { // The input split can contain single HDFS block or multiple blocks, so firstly get all the // blocks and then set them in the query model. @@ -145,7 +146,8 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { } } - @Override public void close() throws IOException { + @Override + public void close() throws IOException { logStatistics(rowCount, queryModel.getStatisticsRecorder()); if (columnarBatch != null) { columnarBatch.close(); @@ -165,10 +167,13 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { } } - @Override public boolean nextKeyValue() throws IOException, InterruptedException { + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { resultBatch(); - if (returnColumnarBatch) return nextBatch(); + if (returnColumnarBatch) { + return nextBatch(); + } if (batchIdx >= numBatched) { if (!nextBatch()) return false; @@ -177,7 +182,8 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { return true; } - @Override public Object getCurrentValue() throws IOException, InterruptedException { + @Override + public Object getCurrentValue() throws IOException, InterruptedException { if (returnColumnarBatch) { int value = columnarBatch.numValidRows(); rowCount += value; @@ -190,11 +196,13 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { return columnarBatch.getRow(batchIdx - 1); } - @Override public Void getCurrentKey() throws IOException, InterruptedException { + @Override + public Void getCurrentKey() throws IOException, InterruptedException { return null; } - @Override public float getProgress() throws IOException, InterruptedException { + @Override + public float getProgress() throws IOException, InterruptedException { // TODO : Implement it based on total number of rows it is going to retrive. return 0; } @@ -206,44 +214,44 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { */ private void initBatch(MemoryMode memMode) { - List<QueryDimension> queryDimension = queryModel.getQueryDimension(); - List<QueryMeasure> queryMeasures = queryModel.getQueryMeasures(); + List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions(); + List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures(); StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()]; for (int i = 0; i < queryDimension.size(); i++) { - QueryDimension dim = queryDimension.get(i); + ProjectionDimension dim = queryDimension.get(i); if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) { DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory .getDirectDictionaryGenerator(dim.getDimension().getDataType()); - fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(), + fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), CarbonScalaUtil.convertCarbonToSparkDataType(generator.getReturnType()), true, null); } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) { - fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(), + fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), CarbonScalaUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()), true, null); } else if (dim.getDimension().isComplex()) { - fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(), + fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), CarbonScalaUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()), true, null); } else { - fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(), + fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), CarbonScalaUtil.convertCarbonToSparkDataType(DataTypes.INT), true, null); } } for (int i = 0; i < queryMeasures.size(); i++) { - QueryMeasure msr = queryMeasures.get(i); + ProjectionMeasure msr = queryMeasures.get(i); DataType dataType = msr.getMeasure().getDataType(); if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT || dataType == DataTypes.INT || dataType == DataTypes.LONG) { - fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(), + fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), CarbonScalaUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()), true, null); } else if (DataTypes.isDecimal(dataType)) { - fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(), + fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale()), true, null); } else { - fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(), + fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), CarbonScalaUtil.convertCarbonToSparkDataType(DataTypes.DOUBLE), true, null); } } @@ -261,9 +269,8 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { initBatch(DEFAULT_MEMORY_MODE); } - private ColumnarBatch resultBatch() { + private void resultBatch() { if (columnarBatch == null) initBatch(); - return columnarBatch; } /*