http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java index 56e83db..b953d45 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java @@ -17,34 +17,41 @@ package org.apache.carbondata.core.util; -import mockit.Mock; -import mockit.MockUp; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashSet; +import java.util.List; +import java.util.Set; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO; -import org.apache.carbondata.core.metadata.index.BlockIndexInfo; +import org.apache.carbondata.core.datastore.page.EncodedTablePage; +import org.apache.carbondata.core.datastore.page.encoding.EncodedMeasurePage; import org.apache.carbondata.core.metadata.BlockletInfoColumnar; +import org.apache.carbondata.core.metadata.CodecMetaFactory; +import org.apache.carbondata.core.metadata.ColumnPageCodecMeta; import org.apache.carbondata.core.metadata.ValueEncoderMeta; -import org.apache.carbondata.format.*; +import org.apache.carbondata.core.metadata.index.BlockIndexInfo; +import org.apache.carbondata.format.BlockIndex; +import org.apache.carbondata.format.BlockletInfo; import org.apache.carbondata.format.BlockletMinMaxIndex; import org.apache.carbondata.format.ColumnSchema; +import org.apache.carbondata.format.DataChunk; import org.apache.carbondata.format.DataType; +import org.apache.carbondata.format.Encoding; +import org.apache.carbondata.format.FileFooter; +import org.apache.carbondata.format.IndexHeader; +import org.apache.carbondata.format.SegmentInfo; +import mockit.Mock; +import mockit.MockUp; import org.junit.BeforeClass; import org.junit.Test; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import static junit.framework.TestCase.*; -import static org.apache.carbondata.core.util.CarbonMetadataUtil.getIndexHeader; +import static junit.framework.TestCase.assertEquals; import static org.apache.carbondata.core.util.CarbonMetadataUtil.convertFileFooter; import static org.apache.carbondata.core.util.CarbonMetadataUtil.getBlockIndexInfo; +import static org.apache.carbondata.core.util.CarbonMetadataUtil.getIndexHeader; public class CarbonMetadataUtilTest { static List<ByteBuffer> byteBufferList; @@ -57,8 +64,6 @@ public class CarbonMetadataUtilTest { static int[] objDecimal; @BeforeClass public static void setUp() { - Long lngObj = new Long("11221"); - byte byt = 1; objMaxArr = new Long[6]; objMaxArr[0] = new Long("111111"); objMaxArr[1] = new Long("121111"); @@ -113,13 +118,11 @@ public class CarbonMetadataUtilTest { blockletInfoList.add(blockletInfo); blockletInfoList.add(blockletInfo); - ValueEncoderMeta valueEncoderMeta = new ValueEncoderMeta(); - valueEncoderMeta.setDecimal(5); - valueEncoderMeta.setMinValue(objMinArr); - valueEncoderMeta.setMaxValue(objMaxArr); - valueEncoderMeta.setUniqueValue(lngObj); - valueEncoderMeta.setType('a'); - valueEncoderMeta.setDataTypeSelected(byt); + ValueEncoderMeta meta = CodecMetaFactory.createMeta(); + meta.setDecimal(5); + meta.setMinValue(objMinArr); + meta.setMaxValue(objMaxArr); + meta.setType(ColumnPageCodecMeta.DOUBLE_MEASURE); List<Encoding> encoders = new ArrayList<>(); encoders.add(Encoding.INVERTED_INDEX); @@ -199,19 +202,52 @@ public class CarbonMetadataUtilTest { ValueEncoderMeta[] metas = new ValueEncoderMeta[6]; for (int i = 0; i < metas.length; i++) { - metas[i] = new ValueEncoderMeta(); + metas[i] = CodecMetaFactory.createMeta(); metas[i].setMinValue(objMinArr[i]); metas[i].setMaxValue(objMaxArr[i]); - metas[i].setUniqueValue(objMinArr[i]); metas[i].setDecimal(objDecimal[i]); - metas[i].setType(CarbonCommonConstants.BIG_INT_MEASURE); - metas[i].setDataTypeSelected(byteArr[i]); + metas[i].setType(ColumnPageCodecMeta.BIG_INT_MEASURE); } - MeasurePageStatsVO stats = MeasurePageStatsVO.build(metas); - BlockletInfoColumnar blockletInfoColumnar = new BlockletInfoColumnar(); + final ValueEncoderMeta meta = CodecMetaFactory.createMeta(); + + new MockUp<ColumnPageCodecMeta>() { + @SuppressWarnings("unused") @Mock + public byte[] serialize() { + return new byte[]{1,2}; + } + @SuppressWarnings("unused") @Mock + public byte[] getMaxAsBytes() { + return new byte[]{1,2}; + } + @SuppressWarnings("unused") @Mock + public byte[] getMinAsBytes() { + return new byte[]{1,2}; + } + @SuppressWarnings("unused") @Mock + public org.apache.carbondata.core.metadata.datatype.DataType getSrcDataType() { + return org.apache.carbondata.core.metadata.datatype.DataType.DOUBLE; + } + }; + + new MockUp<EncodedMeasurePage>() { + @SuppressWarnings("unused") @Mock + public ValueEncoderMeta getMetaData() { + return meta; + } + }; + + final EncodedMeasurePage measure = new EncodedMeasurePage(6, new byte[]{0,1}, meta, + new BitSet()); + new MockUp<EncodedTablePage>() { + @SuppressWarnings("unused") @Mock + public EncodedMeasurePage getMeasure(int measureIndex) { + return measure; + } + }; + BitSet[] bitSetArr = new BitSet[6]; bitSetArr[0] = new BitSet(); bitSetArr[1] = new BitSet(); @@ -222,7 +258,6 @@ public class CarbonMetadataUtilTest { blockletInfoColumnar.setColumnMaxData(maxByteArr); blockletInfoColumnar.setColumnMinData(maxByteArr); blockletInfoColumnar.setKeyLengths(intArr); - blockletInfoColumnar.setColGrpBlocks(boolArr); blockletInfoColumnar.setKeyOffSets(longArr); blockletInfoColumnar.setDataIndexMapOffsets(longArr); blockletInfoColumnar.setAggKeyBlock(boolArr); @@ -232,7 +267,8 @@ public class CarbonMetadataUtilTest { blockletInfoColumnar.setMeasureLength(intArr); blockletInfoColumnar.setMeasureOffset(longArr); blockletInfoColumnar.setMeasureNullValueIndex(bitSetArr); - blockletInfoColumnar.setStats(stats); + EncodedTablePage encodedTablePage = EncodedTablePage.newEmptyInstance(); + blockletInfoColumnar.setEncodedTablePage(encodedTablePage); BlockletInfoColumnar blockletInfoColumnar1 = new BlockletInfoColumnar(); blockletInfoColumnar1.setColumnMaxData(maxByteArr); @@ -243,13 +279,11 @@ public class CarbonMetadataUtilTest { blockletInfoColumnar1.setAggKeyBlock(boolArr); blockletInfoColumnar1.setDataIndexMapLength(intArr); blockletInfoColumnar1.setIsSortedKeyColumn(boolArr); - blockletInfoColumnar1.setColGrpBlocks(boolArr); blockletInfoColumnar1.setKeyOffSets(longArr); blockletInfoColumnar1.setMeasureLength(intArr); blockletInfoColumnar1.setMeasureOffset(longArr); blockletInfoColumnar1.setMeasureNullValueIndex(bitSetArr); - blockletInfoColumnar1.setStats(stats); - blockletInfoColumnar1.setColGrpBlocks(boolArr); + blockletInfoColumnar1.setEncodedTablePage(encodedTablePage); List<BlockletInfoColumnar> blockletInfoColumnarList = new ArrayList<>(); blockletInfoColumnarList.add(blockletInfoColumnar); @@ -285,7 +319,7 @@ public class CarbonMetadataUtilTest { BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex(); blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(byteMaxArr)); blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(byteMinArr)); - FileFooter result = convertFileFooter(blockletInfoColumnarList, 4, cardinality, columnSchemas, + FileFooter result = convertFileFooter(blockletInfoColumnarList, cardinality, columnSchemas, segmentProperties); assertEquals(result.getTable_columns(), columnSchemas);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java index 755308e..f5d7dc7 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java @@ -50,14 +50,6 @@ public class DataTypeUtilTest { } - @Test public void testGetAggType() { - assertTrue(getAggType(DataType.DECIMAL) == 'b'); - assertTrue(getAggType(DataType.INT) == 'd'); - assertTrue(getAggType(DataType.LONG) == 'd'); - assertTrue(getAggType(DataType.NULL) == 'n'); - - } - @Test public void testBigDecimalToByte() { byte[] result = bigDecimalToByte(BigDecimal.ONE); assertTrue(result == result); http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java b/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java index 64651e5..5fc6df9 100644 --- a/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java +++ b/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java @@ -24,21 +24,26 @@ import java.util.BitSet; import java.util.List; import java.util.UUID; -import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO; -import org.apache.carbondata.core.metadata.ValueEncoderMeta; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.datastore.page.EncodedTablePage; +import org.apache.carbondata.core.datastore.page.encoding.EncodedMeasurePage; import org.apache.carbondata.core.metadata.BlockletInfoColumnar; +import org.apache.carbondata.core.metadata.CodecMetaFactory; +import org.apache.carbondata.core.metadata.ColumnPageCodecMeta; +import org.apache.carbondata.core.metadata.ValueEncoderMeta; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.reader.CarbonFooterReader; import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.format.ColumnSchema; import junit.framework.TestCase; -import org.apache.carbondata.format.ColumnSchema; +import mockit.Mock; +import mockit.MockUp; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -87,9 +92,7 @@ public class CarbonFooterWriterTest extends TestCase{ int[] colCardinality = CarbonUtil.getFormattedCardinality(cardinalities, wrapperColumnSchema); SegmentProperties segmentProperties = new SegmentProperties(wrapperColumnSchema, colCardinality); writer.writeFooter(CarbonMetadataUtil.convertFileFooter( - infoColumnars, - 6, - cardinalities,columnSchema, segmentProperties + infoColumnars, cardinalities,columnSchema, segmentProperties ), 0); CarbonFooterReader metaDataReader = new CarbonFooterReader(filePath, 0); @@ -125,41 +128,7 @@ public class CarbonFooterWriterTest extends TestCase{ return dimColumn; } - /** - * test writing fact metadata. - */ - @Test public void testReadFactMetadata() throws IOException { - deleteFile(); - createFile(); - CarbonFooterWriter writer = new CarbonFooterWriter(filePath); - List<BlockletInfoColumnar> infoColumnars = getBlockletInfoColumnars(); - int[] cardinalities = new int[] { 2, 4, 5, 7, 9, 10}; - List<ColumnSchema> columnSchema = Arrays.asList(new ColumnSchema[]{getDimensionColumn("IMEI1"), - getDimensionColumn("IMEI2"), - getDimensionColumn("IMEI3"), - getDimensionColumn("IMEI4"), - getDimensionColumn("IMEI5"), - getDimensionColumn("IMEI6")}); - List<org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema> wrapperColumnSchema = Arrays.asList(new org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema[]{getWrapperDimensionColumn("IMEI1"), - getWrapperDimensionColumn("IMEI2"), - getWrapperDimensionColumn("IMEI3"), - getWrapperDimensionColumn("IMEI4"), - getWrapperDimensionColumn("IMEI5"), - getWrapperDimensionColumn("IMEI6")}); - int[] colCardinality = CarbonUtil.getFormattedCardinality(cardinalities, wrapperColumnSchema); - SegmentProperties segmentProperties = new SegmentProperties(wrapperColumnSchema, cardinalities); - writer.writeFooter(CarbonMetadataUtil - .convertFileFooter(infoColumnars, 6, colCardinality, - columnSchema,segmentProperties), 0); - - CarbonFooterReader metaDataReader = new CarbonFooterReader(filePath, 0); - List<BlockletInfoColumnar> nodeInfoColumnars = - CarbonMetadataUtil.convertBlockletInfo(metaDataReader.readFooter()); - - assertTrue(nodeInfoColumnars.size() == infoColumnars.size()); - } - - private List<BlockletInfoColumnar> getBlockletInfoColumnars() { + private List<BlockletInfoColumnar> getBlockletInfoColumnars() throws IOException { BlockletInfoColumnar infoColumnar = new BlockletInfoColumnar(); infoColumnar.setStartKey(new byte[] { 1, 2, 3 }); infoColumnar.setEndKey(new byte[] { 8, 9, 10 }); @@ -179,27 +148,47 @@ public class CarbonFooterWriterTest extends TestCase{ infoColumnar.setMeasureLength(new int[] { 6, 7 }); infoColumnar.setMeasureOffset(new long[] { 33, 99 }); infoColumnar.setAggKeyBlock(new boolean[] { true, true, true, true }); - infoColumnar.setColGrpBlocks(new boolean[] { false, false, false, false }); infoColumnar.setMeasureNullValueIndex(new BitSet[] {new BitSet(),new BitSet()}); + infoColumnar.setEncodedTablePage(EncodedTablePage.newEmptyInstance()); + + final ValueEncoderMeta meta = CodecMetaFactory.createMeta(); + + new MockUp<ColumnPageCodecMeta>() { + @SuppressWarnings("unused") @Mock + public byte[] serialize() { + return new byte[]{1,2}; + } + @SuppressWarnings("unused") @Mock + public byte[] getMaxAsBytes() { + return new byte[]{1,2}; + } + @SuppressWarnings("unused") @Mock + public byte[] getMinAsBytes() { + return new byte[]{1,2}; + } + @SuppressWarnings("unused") @Mock + public DataType getSrcDataType() { + return DataType.DOUBLE; + } + + }; + + new MockUp<EncodedMeasurePage>() { + @SuppressWarnings("unused") @Mock + public ValueEncoderMeta getMetaData() { + return meta; + } + }; + + final EncodedMeasurePage measure = new EncodedMeasurePage(2, new byte[]{0,1}, meta, + new BitSet()); + new MockUp<EncodedTablePage>() { + @SuppressWarnings("unused") @Mock + public EncodedMeasurePage getMeasure(int measureIndex) { + return measure; + } + }; - ValueEncoderMeta[] metas = new ValueEncoderMeta[2]; - metas[0] = new ValueEncoderMeta(); - metas[0].setMinValue(0); - metas[0].setMaxValue(44d); - metas[0].setUniqueValue(0d); - metas[0].setDecimal(0); - metas[0].setType(CarbonCommonConstants.DOUBLE_MEASURE); - metas[0].setDataTypeSelected((byte)0); - metas[1] = new ValueEncoderMeta(); - metas[1].setMinValue(0); - metas[1].setMaxValue(55d); - metas[1].setUniqueValue(0d); - metas[1].setDecimal(0); - metas[1].setType(CarbonCommonConstants.DOUBLE_MEASURE); - metas[1].setDataTypeSelected((byte)0); - - MeasurePageStatsVO stats = MeasurePageStatsVO.build(metas); - infoColumnar.setStats(stats); List<BlockletInfoColumnar> infoColumnars = new ArrayList<BlockletInfoColumnar>(); infoColumnars.add(infoColumnar); return infoColumnars; http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/format/src/main/thrift/carbondata.thrift ---------------------------------------------------------------------- diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift index b4cbc4e..8acd0b1 100644 --- a/format/src/main/thrift/carbondata.thrift +++ b/format/src/main/thrift/carbondata.thrift @@ -119,7 +119,7 @@ struct DataChunk{ * in Row Major format. * * For V3, one data chunk is one page data of 32K rows. - * For V2 & V1, one data chunk is one blocklet data. + * For V2, one data chunk is one blocklet data. */ struct DataChunk2{ 1: required ChunkCompressionMeta chunk_meta; // The metadata of a chunk http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java index 2980ad3..3ca8cf1 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java @@ -116,8 +116,8 @@ class CarbonHiveSerDe extends AbstractSerDe { @Override public Writable serialize(Object obj, ObjectInspector objectInspector) throws SerDeException { if (!objInspector.getCategory().equals(ObjectInspector.Category.STRUCT)) { - throw new SerDeException("Cannot serialize " + objInspector.getCategory() - + ". Can only serialize a struct"); + throw new SerDeException("Cannot serializeStartKey " + objInspector.getCategory() + + ". Can only serializeStartKey a struct"); } serializedSize += ((StructObjectInspector) objInspector).getAllStructFieldRefs().size(); status = LAST_OPERATION.SERIALIZE; http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java b/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java index be17823..757a342 100644 --- a/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java +++ b/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java @@ -87,7 +87,7 @@ // assertEquals("deserialization gives the wrong object", t, row); // // // Serialize -// final ArrayWritable serializedArr = (ArrayWritable) serDe.serialize(row, oi); +// final ArrayWritable serializedArr = (ArrayWritable) serDe.serializeStartKey(row, oi); // assertEquals("size correct after serialization", serDe.getSerDeStats().getRawDataSize(), // serializedArr.get().length); // assertTrue("serialized object should be equal to starting object", http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/integration/spark-common-test/src/test/resources/complexTypeDecimal.csv ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/resources/complexTypeDecimal.csv b/integration/spark-common-test/src/test/resources/complexTypeDecimal.csv index 8c90d27..db4bca9 100644 --- a/integration/spark-common-test/src/test/resources/complexTypeDecimal.csv +++ b/integration/spark-common-test/src/test/resources/complexTypeDecimal.csv @@ -1,9 +1,9 @@ ID,date,country,name,phonetype,serialname,salary,complex -1.2,2015/7/23,china,aaa1,phone197,ASD69643,15000,3.113$3.33 -2,2015/7/24,china,aaa2,phone756,ASD42892,15001,3.123$7.33 -4.3,2015/7/26,china,aaa4,phone2435,ASD66902,15003,3.123$56.33 -5,2015/7/27,china,aaa5,phone2441,ASD90633,15004,3.133$5.33 -6.5,2015/7/28,china,aaa6,phone294,ASD59961,15005,3.133$54.33 -8,2015/7/30,china,aaa8,phone1848,ASD57308,15007,32.13$56.33 -9.1,2015/7/18,china,aaa9,phone706,ASD86717,15008,3.213$44.33 -10,2015/7/19,usa,aaa10,phone685,ASD30505,15009,32.13$33.33 \ No newline at end of file +1.2,2015/07/23,china,aaa1,phone197,ASD69643,15000,3.113$3.33 +2,2015/07/24,china,aaa2,phone756,ASD42892,15001,3.123$7.33 +4.3,2015/07/26,china,aaa4,phone2435,ASD66902,15003,3.123$56.33 +5,2015/07/27,china,aaa5,phone2441,ASD90633,15004,3.133$5.33 +6.5,2015/07/28,china,aaa6,phone294,ASD59961,15005,3.133$54.33 +8,2015/07/30,china,aaa8,phone1848,ASD57308,15007,32.13$56.33 +9.1,2015/07/18,china,aaa9,phone706,ASD86717,15008,3.213$44.33 +10,2015/07/19,usa,aaa10,phone685,ASD30505,15009,32.13$33.33 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/integration/spark-common-test/src/test/resources/complexTypeDecimalNestedHive.csv ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/resources/complexTypeDecimalNestedHive.csv b/integration/spark-common-test/src/test/resources/complexTypeDecimalNestedHive.csv index ae67723..e8c023b 100644 --- a/integration/spark-common-test/src/test/resources/complexTypeDecimalNestedHive.csv +++ b/integration/spark-common-test/src/test/resources/complexTypeDecimalNestedHive.csv @@ -1,4 +1,4 @@ -1.2,2015/7/23,china,aaa1,phone197,ASD69643,15000,3.113:imei$3.33:imsi +1.2,2015-7-23 00:00:00,china,aaa1,phone197,ASD69643,15000,3.113:imei$3.33:imsi 2,2015/7/24,china,aaa2,phone756,ASD42892,15001,3.123:imei$7.33:imsi 4.3,2015/7/26,china,aaa4,phone2435,ASD66902,15003,3.123:imei$56.33:imsi 5,2015/7/27,china,aaa5,phone2441,ASD90633,15004,3.133:imei$5.33:imsi http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/core/datastore/columnar/ColGroupBlockStorage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/core/datastore/columnar/ColGroupBlockStorage.java b/processing/src/main/java/org/apache/carbondata/core/datastore/columnar/ColGroupBlockStorage.java deleted file mode 100644 index 70ac3d9..0000000 --- a/processing/src/main/java/org/apache/carbondata/core/datastore/columnar/ColGroupBlockStorage.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore.columnar; - -import java.util.concurrent.Callable; - -import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.datastore.columnar.IndexStorage; -import org.apache.carbondata.processing.store.colgroup.ColGroupDataHolder; -import org.apache.carbondata.processing.store.colgroup.ColGroupMinMax; - -/** - * it is holder of column group dataPage and also min max for colgroup block dataPage - */ -public class ColGroupBlockStorage implements IndexStorage, Callable<IndexStorage> { - - private byte[][] dataPage; - - private ColGroupMinMax colGrpMinMax; - - public ColGroupBlockStorage(SegmentProperties segmentProperties, int colGrpIndex, - byte[][] dataPage) { - colGrpMinMax = new ColGroupMinMax(segmentProperties, colGrpIndex); - this.dataPage = dataPage; - for (int i = 0; i < dataPage.length; i++) { - colGrpMinMax.add(dataPage[i]); - } - } - - /** - * sorting is not required for colgroup storage and hence return true - */ - @Override public boolean isAlreadySorted() { - return true; - } - - /** - * for column group storage its not required - */ - public ColGroupDataHolder getRowIdPage() { - //not required for column group storage - return null; - } - - /** - * for column group storage its not required - */ - public ColGroupDataHolder getRowIdRlePage() { - // not required for column group storage - return null; - } - - /** - * for column group storage its not required - */ - public byte[][] getDataPage() { - return dataPage; - } - - /** - * for column group storage its not required - */ - public ColGroupDataHolder getDataRlePage() { - //not required for column group - return null; - } - - /** - * for column group storage its not required - */ - @Override public int getTotalSize() { - return dataPage.length; - } - - @Override public byte[] getMin() { - return colGrpMinMax.getMin(); - } - - @Override public byte[] getMax() { - return colGrpMinMax.getMax(); - } - - /** - * return self - */ - @Override public IndexStorage call() throws Exception { - return this; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java index 5b0685b..6427d02 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java @@ -21,8 +21,8 @@ import java.math.BigDecimal; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.NonDictionaryUtil; import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; -import org.apache.carbondata.processing.util.NonDictionaryUtil; public class SortStepRowUtil { public static Object[] convertRow(Object[] data, SortParameters parameters, http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java index 087b0c7..3940dae 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java @@ -176,7 +176,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { readCounter++; } } catch (Exception e) { - throw new CarbonDataLoadingException("unable to generate the mdkey", e); + throw new CarbonDataLoadingException(e); } rowCounter.getAndAdd(batch.getSize()); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java index 6528d44..653da7b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java @@ -31,8 +31,8 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.NonDictionaryUtil; import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; -import org.apache.carbondata.processing.util.NonDictionaryUtil; public class IntermediateFileMerger implements Callable<Void> { /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java index bc6640d..11c42a9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java @@ -22,7 +22,7 @@ import java.util.Comparator; import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; -import org.apache.carbondata.processing.util.NonDictionaryUtil; +import org.apache.carbondata.core.util.NonDictionaryUtil; public class RowComparator implements Comparator<Object[]> { /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java index 8d914ea..be29bf8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java @@ -18,7 +18,7 @@ package org.apache.carbondata.processing.sortandgroupby.sortdata; import java.util.Comparator; -import org.apache.carbondata.processing.util.NonDictionaryUtil; +import org.apache.carbondata.core.util.NonDictionaryUtil; /** * This class is used as comparator for comparing dims which are non high cardinality dims. http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java index 0f7ae1a..6abba0b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java @@ -35,8 +35,8 @@ import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.NonDictionaryUtil; import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; -import org.apache.carbondata.processing.util.NonDictionaryUtil; public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHolder> { http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java index d629cdc..51b3964 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java @@ -23,8 +23,8 @@ import java.io.IOException; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.NonDictionaryUtil; import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; -import org.apache.carbondata.processing.util.NonDictionaryUtil; public class UnCompressedTempSortFileWriter extends AbstractTempSortFileWriter { http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index 429c5a3..be49707 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -38,7 +38,7 @@ import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; import org.apache.carbondata.core.datastore.GenericDataType; import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; -import org.apache.carbondata.core.datastore.page.encoding.EncodedData; +import org.apache.carbondata.core.datastore.page.EncodedTablePage; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter; @@ -51,7 +51,6 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.NodeHolder; import org.apache.carbondata.processing.newflow.sort.SortScopeOptions; import org.apache.carbondata.processing.store.file.FileManager; import org.apache.carbondata.processing.store.file.IFileManagerComposite; @@ -148,8 +147,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { */ private ColumnarFormatVersion version; - private TablePageEncoder encoder; - private SortScopeOptions.SortScope sortScope; /** @@ -201,7 +198,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } } this.version = CarbonProperties.getInstance().getFormatVersion(); - this.encoder = new TablePageEncoder(model); String noInvertedIdxCol = ""; for (CarbonDimension cd : model.getSegmentProperties().getDimensions()) { if (!cd.isUseInvertedIndex()) { @@ -342,35 +338,26 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } /** - * generate the NodeHolder from the input rows (one page in case of V3 format) + * generate the EncodedTablePage from the input rows (one page in case of V3 format) */ - private NodeHolder processDataRows(List<CarbonRow> dataRows) + private EncodedTablePage processDataRows(List<CarbonRow> dataRows) throws CarbonDataWriterException, KeyGenException, MemoryException, IOException { if (dataRows.size() == 0) { - return new NodeHolder(); + return EncodedTablePage.newEmptyInstance(); } TablePage tablePage = new TablePage(model, dataRows.size()); - TablePageKey keys = new TablePageKey(model, dataRows.size()); int rowId = 0; // convert row to columnar data for (CarbonRow row : dataRows) { - tablePage.addRow(rowId, row); - keys.update(rowId, row); - rowId++; + tablePage.addRow(rowId++, row); } - // apply and compress dimensions and measure - EncodedData encodedData = encoder.encode(tablePage); - - TablePageStatistics tablePageStatistics = new TablePageStatistics( - model.getTableSpec(), tablePage, encodedData, tablePage.getMeasureStats()); - - NodeHolder nodeHolder = dataWriter.buildDataNodeHolder(encodedData, tablePageStatistics, keys); + EncodedTablePage encoded = tablePage.encode(); tablePage.freeMemory(); LOGGER.info("Number Of records processed: " + dataRows.size()); - return nodeHolder; + return encoded; } /** @@ -469,7 +456,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } consumerExecutorService.shutdownNow(); processWriteTaskSubmitList(consumerExecutorServiceTaskList); - this.dataWriter.writeBlockletInfoToFile(); + this.dataWriter.writeFooterToFile(); LOGGER.info("All blocklets have been finished writing"); // close all the open stream for both the files this.dataWriter.closeWriter(); @@ -665,7 +652,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { /** * array of blocklet data holder objects */ - private NodeHolder[] nodeHolders; + private EncodedTablePage[] encodedTablePages; /** * flag to check whether the producer has completed processing for holder * object which is required to be picked form an index @@ -677,7 +664,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { private int currentIndex; private BlockletDataHolder() { - nodeHolders = new NodeHolder[numberOfCores]; + encodedTablePages = new EncodedTablePage[numberOfCores]; available = new AtomicBoolean(false); } @@ -685,32 +672,32 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { * @return a node holder object * @throws InterruptedException if consumer thread is interrupted */ - public synchronized NodeHolder get() throws InterruptedException { - NodeHolder nodeHolder = nodeHolders[currentIndex]; + public synchronized EncodedTablePage get() throws InterruptedException { + EncodedTablePage encodedTablePage = encodedTablePages[currentIndex]; // if node holder is null means producer thread processing the data which has to // be inserted at this current index has not completed yet - if (null == nodeHolder && !processingComplete) { + if (null == encodedTablePage && !processingComplete) { available.set(false); } while (!available.get()) { wait(); } - nodeHolder = nodeHolders[currentIndex]; - nodeHolders[currentIndex] = null; + encodedTablePage = encodedTablePages[currentIndex]; + encodedTablePages[currentIndex] = null; currentIndex++; // reset current index when it reaches length of node holder array - if (currentIndex >= nodeHolders.length) { + if (currentIndex >= encodedTablePages.length) { currentIndex = 0; } - return nodeHolder; + return encodedTablePage; } /** - * @param nodeHolder + * @param encodedTablePage * @param index */ - public synchronized void put(NodeHolder nodeHolder, int index) { - nodeHolders[index] = nodeHolder; + public synchronized void put(EncodedTablePage encodedTablePage, int index) { + encodedTablePages[index] = encodedTablePage; // notify the consumer thread when index at which object is to be inserted // becomes equal to current index from where data has to be picked for writing if (index == currentIndex) { @@ -728,14 +715,14 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { private BlockletDataHolder blockletDataHolder; private List<CarbonRow> dataRows; private int sequenceNumber; - private boolean isWriteAll; + private boolean isLastPage; private Producer(BlockletDataHolder blockletDataHolder, List<CarbonRow> dataRows, - int sequenceNumber, boolean isWriteAll) { + int sequenceNumber, boolean isLastPage) { this.blockletDataHolder = blockletDataHolder; this.dataRows = dataRows; this.sequenceNumber = sequenceNumber; - this.isWriteAll = isWriteAll; + this.isLastPage = isLastPage; } /** @@ -746,11 +733,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { */ @Override public Void call() throws Exception { try { - NodeHolder nodeHolder = processDataRows(dataRows); - nodeHolder.setWriteAll(isWriteAll); + EncodedTablePage encodedTablePage = processDataRows(dataRows); + encodedTablePage.setIsLastPage(isLastPage); // insert the object in array according to sequence number int indexInNodeHolderArray = (sequenceNumber - 1) % numberOfCores; - blockletDataHolder.put(nodeHolder, indexInNodeHolderArray); + blockletDataHolder.put(encodedTablePage, indexInNodeHolderArray); return null; } catch (Throwable throwable) { LOGGER.error(throwable, "Error in producer"); @@ -780,11 +767,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { */ @Override public Void call() throws Exception { while (!processingComplete || blockletProcessingCount.get() > 0) { - NodeHolder nodeHolder = null; + EncodedTablePage encodedTablePage = null; try { - nodeHolder = blockletDataHolder.get(); - if (null != nodeHolder) { - dataWriter.writeBlockletData(nodeHolder); + encodedTablePage = blockletDataHolder.get(); + if (null != encodedTablePage) { + dataWriter.writeTablePage(encodedTablePage); } blockletProcessingCount.decrementAndGet(); } catch (Throwable throwable) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java index f068400..4541ae9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java @@ -23,23 +23,45 @@ import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import org.apache.carbondata.core.datastore.DimensionType; import org.apache.carbondata.core.datastore.GenericDataType; +import org.apache.carbondata.core.datastore.TableSpec; +import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt; +import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForInt; +import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort; +import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort; +import org.apache.carbondata.core.datastore.columnar.IndexStorage; +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.datastore.page.ComplexColumnPage; -import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO; +import org.apache.carbondata.core.datastore.page.EncodedTablePage; +import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec; +import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingStrategy; +import org.apache.carbondata.core.datastore.page.encoding.EncodedDimensionPage; +import org.apache.carbondata.core.datastore.page.encoding.EncodedMeasurePage; +import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategy; +import org.apache.carbondata.core.datastore.page.key.TablePageKey; +import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector; +import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult; +import org.apache.carbondata.core.datastore.page.statistics.VarLengthPageStatsCollector; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; import org.apache.spark.sql.types.Decimal; - /** * Represent a page data for all columns, we store its data in columnar layout, so that * all processing apply to TablePage can be done in vectorized fashion. @@ -56,24 +78,30 @@ public class TablePage { private ComplexColumnPage[] complexDimensionPage; private ColumnPage[] measurePage; - private MeasurePageStatsVO measurePageStatistics; - // the num of rows in this page, it must be less than short value (65536) private int pageSize; private CarbonFactDataHandlerModel model; + private TablePageKey key; + + private ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion(); + TablePage(CarbonFactDataHandlerModel model, int pageSize) throws MemoryException { this.model = model; this.pageSize = pageSize; int numDictDimension = model.getMDKeyGenerator().getDimCount(); dictDimensionPage = new ColumnPage[numDictDimension]; for (int i = 0; i < dictDimensionPage.length; i++) { - dictDimensionPage[i] = ColumnPage.newVarLengthPath(DataType.BYTE_ARRAY, pageSize); + ColumnPage page = ColumnPage.newVarLengthPath(DataType.BYTE_ARRAY, pageSize); + page.setStatsCollector(VarLengthPageStatsCollector.newInstance()); + dictDimensionPage[i] = page; } noDictDimensionPage = new ColumnPage[model.getNoDictionaryCount()]; for (int i = 0; i < noDictDimensionPage.length; i++) { - noDictDimensionPage[i] = ColumnPage.newVarLengthPath(DataType.BYTE_ARRAY, pageSize); + ColumnPage page = ColumnPage.newVarLengthPath(DataType.BYTE_ARRAY, pageSize); + page.setStatsCollector(VarLengthPageStatsCollector.newInstance()); + noDictDimensionPage[i] = page; } complexDimensionPage = new ComplexColumnPage[model.getComplexColumnCount()]; for (int i = 0; i < complexDimensionPage.length; i++) { @@ -84,21 +112,32 @@ public class TablePage { measurePage = new ColumnPage[model.getMeasureCount()]; DataType[] dataTypes = model.getMeasureDataType(); for (int i = 0; i < measurePage.length; i++) { - measurePage[i] = ColumnPage.newPage(dataTypes[i], pageSize); + ColumnPage page = ColumnPage.newPage(dataTypes[i], pageSize); + page.setStatsCollector(PrimitivePageStatsCollector.newInstance(dataTypes[i], pageSize)); + measurePage[i] = page; } + boolean hasNoDictionary = noDictDimensionPage.length > 0; + this.key = new TablePageKey(pageSize, model.getMDKeyGenerator(), model.getSegmentProperties(), + hasNoDictionary); } /** - * Add one row to the internal store, it will be converted into columnar layout + * Add one row to the internal store * * @param rowId Id of the input row * @param row row object */ public void addRow(int rowId, CarbonRow row) throws KeyGenException { - // convert each column category + // convert each column category, update key and stats + byte[] mdk = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator()); + convertToColumnarAndAddToPages(rowId, row, mdk); + key.update(rowId, row, mdk); + } + // convert the input row object to columnar data and add to column pages + private void convertToColumnarAndAddToPages(int rowId, CarbonRow row, byte[] mdk) + throws KeyGenException { // 1. convert dictionary columns - byte[] mdk = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator()); byte[][] keys = model.getSegmentProperties().getFixedLengthKeySplitter().splitKey(mdk); for (int i = 0; i < dictDimensionPage.length; i++) { dictDimensionPage[i].putData(rowId, keys[i]); @@ -137,11 +176,6 @@ public class TablePage { } measurePage[i].putData(rowId, value); } - - // update statistics if it is last row - if (rowId + 1 == pageSize) { - this.measurePageStatistics = new MeasurePageStatsVO(measurePage); - } } /** @@ -160,10 +194,10 @@ public class TablePage { // initialize the page if first row if (rowId == 0) { int depthInComplexColumn = complexDataType.getColsCount(); - getComplexDimensionPage()[index] = new ComplexColumnPage(pageSize, depthInComplexColumn); + complexDimensionPage[index] = new ComplexColumnPage(pageSize, depthInComplexColumn); } - int depthInComplexColumn = getComplexDimensionPage()[index].getDepth(); + int depthInComplexColumn = complexDimensionPage[index].getDepth(); // this is the result columnar data which will be added to page, // size of this list is the depth of complex column, we will fill it by input data List<ArrayList<byte[]>> encodedComplexColumnar = new ArrayList<>(); @@ -187,8 +221,7 @@ public class TablePage { } for (int depth = 0; depth < depthInComplexColumn; depth++) { - getComplexDimensionPage()[index] - .putComplexData(rowId, depth, encodedComplexColumnar.get(depth)); + complexDimensionPage[index].putComplexData(rowId, depth, encodedComplexColumnar.get(depth)); } } @@ -217,26 +250,161 @@ public class TablePage { return output; } - ColumnPage[] getDictDimensionPage() { - return dictDimensionPage; + EncodedTablePage encode() throws KeyGenException, MemoryException, IOException { + // encode dimensions and measure + EncodedDimensionPage[] dimensions = encodeAndCompressDimensions(); + EncodedMeasurePage[] measures = encodeAndCompressMeasures(); + return EncodedTablePage.newInstance(pageSize, dimensions, measures, key); } - ColumnPage[] getNoDictDimensionPage() { - return noDictDimensionPage; + private EncodingStrategy encodingStrategy = new DefaultEncodingStrategy(); + + // apply measure and set encodedData in `encodedData` + private EncodedMeasurePage[] encodeAndCompressMeasures() + throws MemoryException, IOException { + EncodedMeasurePage[] encodedMeasures = new EncodedMeasurePage[measurePage.length]; + for (int i = 0; i < measurePage.length; i++) { + SimpleStatsResult stats = (SimpleStatsResult)(measurePage[i].getStatistics()); + ColumnPageCodec encoder = encodingStrategy.createCodec(stats); + encodedMeasures[i] = (EncodedMeasurePage) encoder.encode(measurePage[i]); + } + return encodedMeasures; + } + + private IndexStorage encodeAndCompressDictDimension(byte[][] data, boolean isSort, + boolean isUseInvertedIndex, boolean isRleApplicable) throws KeyGenException { + if (isUseInvertedIndex) { + if (version == ColumnarFormatVersion.V3) { + return new BlockIndexerStorageForShort(data, isRleApplicable, false, isSort); + } else { + return new BlockIndexerStorageForInt(data, isRleApplicable, false, isSort); + } + } else { + if (version == ColumnarFormatVersion.V3) { + return new BlockIndexerStorageForNoInvertedIndexForShort(data, false); + } else { + return new BlockIndexerStorageForNoInvertedIndexForInt(data); + } + } } - ComplexColumnPage[] getComplexDimensionPage() { - return complexDimensionPage; + private IndexStorage encodeAndCompressDirectDictDimension(byte[][] data, boolean isSort, + boolean isUseInvertedIndex, boolean isRleApplicable) throws KeyGenException { + if (isUseInvertedIndex) { + if (version == ColumnarFormatVersion.V3) { + return new BlockIndexerStorageForShort(data, isRleApplicable, false, isSort); + } else { + return new BlockIndexerStorageForInt(data, isRleApplicable, false, isSort); + } + } else { + if (version == ColumnarFormatVersion.V3) { + return new BlockIndexerStorageForNoInvertedIndexForShort(data, false); + } else { + return new BlockIndexerStorageForNoInvertedIndexForInt(data); + } + } } - ColumnPage[] getMeasurePage() { - return measurePage; + private IndexStorage encodeAndCompressComplexDimension(byte[][] data) { + if (version == ColumnarFormatVersion.V3) { + return new BlockIndexerStorageForShort(data, false, false, false); + } else { + return new BlockIndexerStorageForInt(data, false, false, false); + } } - MeasurePageStatsVO getMeasureStats() { - return measurePageStatistics; + private IndexStorage encodeAndCompressNoDictDimension(byte[][] data, boolean isSort, + boolean isUseInvertedIndex, boolean isRleApplicable) { + if (isUseInvertedIndex) { + if (version == ColumnarFormatVersion.V3) { + return new BlockIndexerStorageForShort(data, isRleApplicable, true, isSort); + } else { + return new BlockIndexerStorageForInt(data, isRleApplicable, true, isSort); + } + } else { + if (version == ColumnarFormatVersion.V3) { + return new BlockIndexerStorageForNoInvertedIndexForShort(data, true); + } else { + return new BlockIndexerStorageForNoInvertedIndexForInt(data); + } + } } + // apply and compress each dimension, set encoded data in `encodedData` + private EncodedDimensionPage[] encodeAndCompressDimensions() + throws KeyGenException { + TableSpec.DimensionSpec dimensionSpec = model.getTableSpec().getDimensionSpec(); + int dictionaryColumnCount = -1; + int noDictionaryColumnCount = -1; + int indexStorageOffset = 0; + IndexStorage[] indexStorages = new IndexStorage[dimensionSpec.getNumExpandedDimensions()]; + Compressor compressor = CompressorFactory.getInstance().getCompressor(); + EncodedDimensionPage[] compressedColumns = new EncodedDimensionPage[indexStorages.length]; + boolean[] isUseInvertedIndex = model.getIsUseInvertedIndex(); + for (int i = 0; i < dimensionSpec.getNumSimpleDimensions(); i++) { + ColumnPage page; + byte[] flattened; + boolean isSortColumn = model.isSortColumn(i); + switch (dimensionSpec.getType(i)) { + case GLOBAL_DICTIONARY: + // dictionary dimension + page = dictDimensionPage[++dictionaryColumnCount]; + indexStorages[indexStorageOffset] = encodeAndCompressDictDimension( + page.getByteArrayPage(), + isSortColumn, + isUseInvertedIndex[i] & isSortColumn, + CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i))); + flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage()); + break; + case DIRECT_DICTIONARY: + // timestamp and date column + page = dictDimensionPage[++dictionaryColumnCount]; + indexStorages[indexStorageOffset] = encodeAndCompressDirectDictDimension( + page.getByteArrayPage(), + isSortColumn, + isUseInvertedIndex[i] & isSortColumn, + CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i))); + flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage()); + break; + case PLAIN_VALUE: + // high cardinality dimension, encoded as plain string + page = noDictDimensionPage[++noDictionaryColumnCount]; + indexStorages[indexStorageOffset] = encodeAndCompressNoDictDimension( + page.getByteArrayPage(), + isSortColumn, + isUseInvertedIndex[i] & isSortColumn, + CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i))); + flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage()); + break; + case COMPLEX: + // we need to add complex column at last, so skipping it here + continue; + default: + throw new RuntimeException("unsupported dimension type: " + dimensionSpec.getType(i)); + } + byte[] compressedData = compressor.compressByte(flattened); + compressedColumns[indexStorageOffset] = new EncodedDimensionPage( + pageSize, compressedData, indexStorages[indexStorageOffset], dimensionSpec.getType(i)); + SimpleStatsResult stats = (SimpleStatsResult) page.getStatistics(); + compressedColumns[indexStorageOffset].setNullBitSet(stats.getNullBits()); + indexStorageOffset++; + } + + // handle complex type column + for (int i = 0; i < dimensionSpec.getNumComplexDimensions(); i++) { + Iterator<byte[][]> iterator = complexDimensionPage[i].iterator(); + while (iterator.hasNext()) { + byte[][] data = iterator.next(); + indexStorages[indexStorageOffset] = encodeAndCompressComplexDimension(data); + byte[] flattened = ByteUtil.flatten(data); + byte[] compressedData = compressor.compressByte(flattened); + compressedColumns[indexStorageOffset] = new EncodedDimensionPage( + pageSize, compressedData, indexStorages[indexStorageOffset], DimensionType.COMPLEX); + indexStorageOffset++; + } + } + return compressedColumns; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java deleted file mode 100644 index 8547845..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.store; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.carbondata.core.datastore.TableSpec; -import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt; -import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForInt; -import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort; -import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort; -import org.apache.carbondata.core.datastore.columnar.IndexStorage; -import org.apache.carbondata.core.datastore.compression.Compressor; -import org.apache.carbondata.core.datastore.compression.CompressorFactory; -import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec; -import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingStrategy; -import org.apache.carbondata.core.datastore.page.encoding.EncodedData; -import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategy; -import org.apache.carbondata.core.keygenerator.KeyGenException; -import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.ColumnarFormatVersion; -import org.apache.carbondata.core.util.ByteUtil; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; - -class TablePageEncoder { - - private ColumnarFormatVersion version; - - private boolean[] isUseInvertedIndex; - - private CarbonFactDataHandlerModel model; - - private static final EncodingStrategy encodingStrategy = new DefaultEncodingStrategy(); - - TablePageEncoder(CarbonFactDataHandlerModel model) { - this.version = CarbonProperties.getInstance().getFormatVersion(); - this.model = model; - this.isUseInvertedIndex = model.getIsUseInvertedIndex(); - } - - // function to apply all columns in one table page - EncodedData encode(TablePage tablePage) - throws KeyGenException, MemoryException, IOException { - EncodedData encodedData = new EncodedData(); - encodeAndCompressDimensions(tablePage, encodedData); - encodeAndCompressMeasures(tablePage, encodedData); - return encodedData; - } - - // apply measure and set encodedData in `encodedData` - private void encodeAndCompressMeasures(TablePage tablePage, EncodedData encodedData) - throws MemoryException, IOException { - ColumnPage[] measurePage = tablePage.getMeasurePage(); - byte[][] encodedMeasures = new byte[measurePage.length][]; - for (int i = 0; i < measurePage.length; i++) { - ColumnPageCodec encoder = encodingStrategy.createCodec(measurePage[i].getStatistics()); - encodedMeasures[i] = encoder.encode(measurePage[i]); - } - encodedData.measures = encodedMeasures; - } - - private IndexStorage encodeAndCompressDictDimension(byte[][] data, boolean isSort, - boolean isUseInvertedIndex, boolean isRleApplicable) throws KeyGenException { - if (isUseInvertedIndex) { - if (version == ColumnarFormatVersion.V3) { - return new BlockIndexerStorageForShort(data, isRleApplicable, false, isSort); - } else { - return new BlockIndexerStorageForInt(data, isRleApplicable, false, isSort); - } - } else { - if (version == ColumnarFormatVersion.V3) { - return new BlockIndexerStorageForNoInvertedIndexForShort(data, false); - } else { - return new BlockIndexerStorageForNoInvertedIndexForInt(data); - } - } - } - - private IndexStorage encodeAndCompressDirectDictDimension(byte[][] data, boolean isSort, - boolean isUseInvertedIndex, boolean isRleApplicable) throws KeyGenException { - if (isUseInvertedIndex) { - if (version == ColumnarFormatVersion.V3) { - return new BlockIndexerStorageForShort(data, isRleApplicable, false, isSort); - } else { - return new BlockIndexerStorageForInt(data, isRleApplicable, false, isSort); - } - } else { - if (version == ColumnarFormatVersion.V3) { - return new BlockIndexerStorageForNoInvertedIndexForShort(data, false); - } else { - return new BlockIndexerStorageForNoInvertedIndexForInt(data); - } - } - } - - private IndexStorage encodeAndCompressComplexDimension(byte[][] data) { - if (version == ColumnarFormatVersion.V3) { - return new BlockIndexerStorageForShort(data, false, false, false); - } else { - return new BlockIndexerStorageForInt(data, false, false, false); - } - } - - private IndexStorage encodeAndCompressNoDictDimension(byte[][] data, boolean isSort, - boolean isUseInvertedIndex, boolean isRleApplicable) { - if (isUseInvertedIndex) { - if (version == ColumnarFormatVersion.V3) { - return new BlockIndexerStorageForShort(data, isRleApplicable, true, isSort); - } else { - return new BlockIndexerStorageForInt(data, isRleApplicable, true, isSort); - } - } else { - if (version == ColumnarFormatVersion.V3) { - return new BlockIndexerStorageForNoInvertedIndexForShort(data, true); - } else { - return new BlockIndexerStorageForNoInvertedIndexForInt(data); - } - } - } - - // apply and compress each dimension, set encoded data in `encodedData` - private void encodeAndCompressDimensions(TablePage tablePage, EncodedData encodedData) - throws KeyGenException { - TableSpec.DimensionSpec dimensionSpec = model.getTableSpec().getDimensionSpec(); - int dictionaryColumnCount = -1; - int noDictionaryColumnCount = -1; - int indexStorageOffset = 0; - IndexStorage[] indexStorages = new IndexStorage[dimensionSpec.getNumExpandedDimensions()]; - Compressor compressor = CompressorFactory.getInstance().getCompressor(); - byte[][] compressedColumns = new byte[indexStorages.length][]; - for (int i = 0; i < dimensionSpec.getNumSimpleDimensions(); i++) { - byte[] flattened; - boolean isSortColumn = model.isSortColumn(i); - switch (dimensionSpec.getType(i)) { - case GLOBAL_DICTIONARY: - // dictionary dimension - indexStorages[indexStorageOffset] = encodeAndCompressDictDimension( - tablePage.getDictDimensionPage()[++dictionaryColumnCount].getByteArrayPage(), - isSortColumn, isUseInvertedIndex[i] & isSortColumn, - CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i))); - flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage()); - break; - case DIRECT_DICTIONARY: - // timestamp and date column - indexStorages[indexStorageOffset] = encodeAndCompressDirectDictDimension( - tablePage.getDictDimensionPage()[++dictionaryColumnCount].getByteArrayPage(), - isSortColumn, isUseInvertedIndex[i] & isSortColumn, - CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i))); - flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage()); - break; - case PLAIN_VALUE: - // high cardinality dimension, encoded as plain string - indexStorages[indexStorageOffset] = encodeAndCompressNoDictDimension( - tablePage.getNoDictDimensionPage()[++noDictionaryColumnCount].getByteArrayPage(), - isSortColumn, isUseInvertedIndex[i] & isSortColumn, - CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i))); - flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage()); - break; - case COMPLEX: - // we need to add complex column at last, so skipping it here - continue; - default: - throw new RuntimeException("unsupported dimension type: " + dimensionSpec.getType(i)); - } - compressedColumns[indexStorageOffset] = compressor.compressByte(flattened); - indexStorageOffset++; - } - - // handle complex type column - for (int i = 0; i < dimensionSpec.getNumComplexDimensions(); i++) { - Iterator<byte[][]> iterator = tablePage.getComplexDimensionPage()[i].iterator(); - while (iterator.hasNext()) { - byte[][] data = iterator.next(); - indexStorages[indexStorageOffset] = encodeAndCompressComplexDimension(data); - byte[] flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage()); - compressedColumns[indexStorageOffset] = compressor.compressByte(flattened); - indexStorageOffset++; - } - } - - encodedData.indexStorages = indexStorages; - encodedData.dimensions = compressedColumns; - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java deleted file mode 100644 index 3cb4777..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.store; - -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; -import org.apache.carbondata.core.keygenerator.KeyGenException; -import org.apache.carbondata.processing.util.NonDictionaryUtil; - -public class TablePageKey { - private int pageSize; - - private byte[][] currentNoDictionaryKey; - - // MDK start key - private byte[] startKey; - - // MDK end key - private byte[] endKey; - - // startkey for no dictionary columns - private byte[][] noDictStartKey; - - // endkey for no diciotn - private byte[][] noDictEndKey; - - // startkey for no dictionary columns after packing into one column - private byte[] packedNoDictStartKey; - - // endkey for no dictionary columns after packing into one column - private byte[] packedNoDictEndKey; - - private CarbonFactDataHandlerModel model; - - TablePageKey(CarbonFactDataHandlerModel model, int pageSize) { - this.model = model; - this.pageSize = pageSize; - } - - /** update all keys based on the input row */ - void update(int rowId, CarbonRow row) throws KeyGenException { - byte[] currentMDKey = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator()); - if (model.getNoDictionaryCount() > 0 || model.getComplexIndexMap().size() > 0) { - currentNoDictionaryKey = WriteStepRowUtil.getNoDictAndComplexDimension(row); - } - if (rowId == 0) { - startKey = currentMDKey; - noDictStartKey = currentNoDictionaryKey; - } - endKey = currentMDKey; - noDictEndKey = currentNoDictionaryKey; - if (rowId == pageSize - 1) { - finalizeKeys(); - } - } - - /** update all keys if SORT_COLUMNS option is used when creating table */ - private void finalizeKeys() { - // If SORT_COLUMNS is used, may need to update start/end keys since the they may - // contains dictionary columns that are not in SORT_COLUMNS, which need to be removed from - // start/end key - int numberOfDictSortColumns = model.getSegmentProperties().getNumberOfDictSortColumns(); - if (numberOfDictSortColumns > 0) { - // if SORT_COLUMNS contain dictionary columns - int[] keySize = model.getSegmentProperties().getFixedLengthKeySplitter().getBlockKeySize(); - if (keySize.length > numberOfDictSortColumns) { - // if there are some dictionary columns that are not in SORT_COLUMNS, it will come to here - int newMdkLength = 0; - for (int i = 0; i < numberOfDictSortColumns; i++) { - newMdkLength += keySize[i]; - } - byte[] newStartKeyOfSortKey = new byte[newMdkLength]; - byte[] newEndKeyOfSortKey = new byte[newMdkLength]; - System.arraycopy(startKey, 0, newStartKeyOfSortKey, 0, newMdkLength); - System.arraycopy(endKey, 0, newEndKeyOfSortKey, 0, newMdkLength); - startKey = newStartKeyOfSortKey; - endKey = newEndKeyOfSortKey; - } - } else { - startKey = new byte[0]; - endKey = new byte[0]; - } - - // Do the same update for noDictionary start/end Key - int numberOfNoDictSortColumns = model.getSegmentProperties().getNumberOfNoDictSortColumns(); - if (numberOfNoDictSortColumns > 0) { - // if sort_columns contain no-dictionary columns - if (noDictStartKey.length > numberOfNoDictSortColumns) { - byte[][] newNoDictionaryStartKey = new byte[numberOfNoDictSortColumns][]; - byte[][] newNoDictionaryEndKey = new byte[numberOfNoDictSortColumns][]; - System.arraycopy( - noDictStartKey, 0, newNoDictionaryStartKey, 0, numberOfNoDictSortColumns); - System.arraycopy( - noDictEndKey, 0, newNoDictionaryEndKey, 0, numberOfNoDictSortColumns); - noDictStartKey = newNoDictionaryStartKey; - noDictEndKey = newNoDictionaryEndKey; - } - packedNoDictStartKey = - NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictStartKey); - packedNoDictEndKey = - NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictEndKey); - } - } - - public byte[] getStartKey() { - return startKey; - } - - public byte[] getEndKey() { - return endKey; - } - - public byte[] getNoDictStartKey() { - return packedNoDictStartKey; - } - - public byte[] getNoDictEndKey() { - return packedNoDictEndKey; - } - - public int getPageSize() { - return pageSize; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java deleted file mode 100644 index 13eaac9..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.store; - -import java.nio.ByteBuffer; -import java.util.BitSet; - -import org.apache.carbondata.core.datastore.TableSpec; -import org.apache.carbondata.core.datastore.columnar.IndexStorage; -import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.datastore.page.encoding.EncodedData; -import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO; -import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO; - -// Statistics of dimension and measure column in a TablePage -public class TablePageStatistics { - - // number of dimension after complex column expanded - private int numDimensionsExpanded; - - // min of each dimension column - private byte[][] dimensionMinValue; - - // max of each dimension column - private byte[][] dimensionMaxValue; - - // min of each measure column - private byte[][] measureMinValue; - - // max os each measure column - private byte[][] measureMaxValue; - - // null bit set for each measure column - private BitSet[] nullBitSet; - - // measure stats - // TODO: there are redundant stats - private MeasurePageStatsVO measurePageStatistics; - - private TableSpec tableSpec; - - TablePageStatistics(TableSpec tableSpec, TablePage tablePage, - EncodedData encodedData, MeasurePageStatsVO measurePageStatistics) { - this.numDimensionsExpanded = tableSpec.getDimensionSpec().getNumExpandedDimensions(); - int numMeasures = tableSpec.getMeasureSpec().getNumMeasures(); - this.dimensionMinValue = new byte[numDimensionsExpanded][]; - this.dimensionMaxValue = new byte[numDimensionsExpanded][]; - this.measureMinValue = new byte[numMeasures][]; - this.measureMaxValue = new byte[numMeasures][]; - this.nullBitSet = new BitSet[numMeasures]; - this.tableSpec = tableSpec; - this.measurePageStatistics = measurePageStatistics; - updateMinMax(tablePage, encodedData); - updateNullBitSet(tablePage); - } - - private void updateMinMax(TablePage tablePage, EncodedData encodedData) { - IndexStorage[] keyStorageArray = encodedData.indexStorages; - byte[][] measureArray = encodedData.measures; - - for (int i = 0; i < numDimensionsExpanded; i++) { - switch (tableSpec.getDimensionSpec().getType(i)) { - case GLOBAL_DICTIONARY: - case DIRECT_DICTIONARY: - case COLUMN_GROUP: - case COMPLEX: - dimensionMinValue[i] = keyStorageArray[i].getMin(); - dimensionMaxValue[i] = keyStorageArray[i].getMax(); - break; - case PLAIN_VALUE: - dimensionMinValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMin()); - dimensionMaxValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMax()); - break; - } - } - for (int i = 0; i < measureArray.length; i++) { - ColumnPageStatsVO stats = tablePage.getMeasurePage()[i].getStatistics(); - measureMaxValue[i] = stats.minBytes(); - measureMinValue[i] = stats.maxBytes(); - } - } - - private void updateNullBitSet(TablePage tablePage) { - nullBitSet = new BitSet[tablePage.getMeasurePage().length]; - ColumnPage[] measurePages = tablePage.getMeasurePage(); - for (int i = 0; i < nullBitSet.length; i++) { - nullBitSet[i] = measurePages[i].getNullBitSet(); - } - } - - /** - * Below method will be used to update the min or max value - * by removing the length from it - * - * @return min max value without length - */ - private byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) { - ByteBuffer buffer = ByteBuffer.wrap(valueWithLength); - byte[] actualValue = new byte[buffer.getShort()]; - buffer.get(actualValue); - return actualValue; - } - - public byte[][] getDimensionMinValue() { - return dimensionMinValue; - } - - public byte[][] getDimensionMaxValue() { - return dimensionMaxValue; - } - - public byte[][] getMeasureMinValue() { - return measureMinValue; - } - - public byte[][] getMeasureMaxValue() { - return measureMaxValue; - } - - public BitSet[] getNullBitSet() { - return nullBitSet; - } - - public MeasurePageStatsVO getMeasurePageStatistics() { - return measurePageStatistics; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index 5d9e081..75a9c45 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -42,6 +42,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.datastore.page.EncodedTablePage; import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor; import org.apache.carbondata.core.metadata.BlockletInfoColumnar; import org.apache.carbondata.core.metadata.CarbonMetadata; @@ -58,7 +59,6 @@ import org.apache.carbondata.core.util.CarbonMergerUtil; import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.NodeHolder; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonIndexFileWriter; @@ -557,7 +557,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< * * @throws CarbonDataWriterException */ - @Override public void writeBlockletInfoToFile() throws CarbonDataWriterException { + @Override public void writeFooterToFile() throws CarbonDataWriterException { if (this.blockletInfoList.size() > 0) { writeBlockletInfoToFile(fileChannel, carbonDataFileTempPath); } @@ -571,7 +571,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< * @throws CarbonDataWriterException * @throws CarbonDataWriterException throws new CarbonDataWriterException if any problem */ - public abstract void writeBlockletData(NodeHolder nodeHolder) throws CarbonDataWriterException; + public abstract void writeTablePage(EncodedTablePage encodedTablePage) + throws CarbonDataWriterException; /** * Below method will be used to update the min or max value @@ -587,36 +588,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< } /** - * Below method will be used to update the no dictionary start and end key - * - * @param key key to be updated - * @return return no dictionary key - */ - protected byte[] updateNoDictionaryStartAndEndKey(byte[] key) { - if (key.length == 0) { - return key; - } - // add key to byte buffer remove the length part of the data - ByteBuffer buffer = ByteBuffer.wrap(key, 2, key.length - 2); - // create a output buffer without length - ByteBuffer output = ByteBuffer.allocate(key.length - 2); - short numberOfByteToStorLength = 2; - // as length part is removed, so each no dictionary value index - // needs to be reshuffled by 2 bytes - int NumberOfNoDictSortColumns = - dataWriterVo.getSegmentProperties().getNumberOfNoDictSortColumns(); - for (int i = 0; i < NumberOfNoDictSortColumns; i++) { - output.putShort((short) (buffer.getShort() - numberOfByteToStorLength)); - } - // copy the data part - while (buffer.hasRemaining()) { - output.put(buffer.get()); - } - output.rewind(); - return output.array(); - } - - /** * This method will copy the carbon data file from local store location to * carbon store location */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/fecafde8/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java index 56ee762..f194f74 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java @@ -18,35 +18,21 @@ package org.apache.carbondata.processing.store.writer; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; -import org.apache.carbondata.core.datastore.page.encoding.EncodedData; -import org.apache.carbondata.core.util.NodeHolder; -import org.apache.carbondata.processing.store.TablePageKey; -import org.apache.carbondata.processing.store.TablePageStatistics; +import org.apache.carbondata.core.datastore.page.EncodedTablePage; public interface CarbonFactDataWriter<T> { /** - * This method will be used to create NodeHolder for a table page + * write a encoded table page */ - - NodeHolder buildDataNodeHolder(EncodedData encoded, TablePageStatistics stats, - TablePageKey key) throws CarbonDataWriterException; - - /** - * If node holder flag is enabled the object will be added to list - * and all the blocklets will be return together. If disabled then this - * method will itself will call for writing the fact data - * - * @param holder - */ - void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException; + void writeTablePage(EncodedTablePage encodedTablePage) throws CarbonDataWriterException; /** * Below method will be used to write the leaf meta data to file * * @throws CarbonDataWriterException */ - void writeBlockletInfoToFile() throws CarbonDataWriterException; + void writeFooterToFile() throws CarbonDataWriterException; /** * Below method will be used to initialise the writer
