Yes, Ravindra, unsafe sort will be better. In my last mail, I mentioned a 8-bytes encoded format for RowID + SORT_COLUMNS, if SORT_COLUMNS are dictionary encoded, I think it is effectively like unsafe which is only type of byte[8], right? So we can do this by ourselves instead of depending on 3rd party library (sum.misc.Unsafe) ?
Regards, Jacky > 在 2017年5月22日,上午11:54,Ravindra Pesala <ravi.pes...@gmail.com> 写道: > > Hi, > > Using Object[] as a row while loading is not efficient in terms of memory > usage. It would be more efficient to keep them in unsafe as it can keep the > data in more compacted way as per data type. > > And regarding sorting it would be good to concentrate on single sorting > solution. Since we already have stable unsafe sort we should better > consider removing other normal sorting implementation to reduce the > maintaining cost. And concentrate more on improving the unsafe sort. > > As per the observation GC is reduced a lot in case of unsafe sort but still > some GC is observed, that is I think because of writer step as it still > holds the data for some time on heap for sorting and writing the data in > blocklet wise. Even this data also we can consider keep in unsafe to avoid > GC. > > > Regards, > Ravindra. > > On 21 May 2017 at 23:15, Jacky Li <jacky.li...@qq.com> wrote: > >> Hi, >> >> While I am working on data load improvement and encoding override feature, >> I found that it is not efficient to use the CarbonRow with Object[]. I >> think a better way is to use fix length primitive type instead of Object. >> >> Since currently SORT_COLUMNS is merged, I think it is possible to: >> 1. separate key columns and non key columns >> 2. use exact data type for all columns instead of Object >> 3. no need to differentiate dimension and measure while loading, encoding >> is based on default strategy or user override >> >> For example: >> >> TablePage { >> ColumnPage[] keyPages; >> ColumnPage[] nonKeyPages; >> } >> >> ColumnPage has following implementations: >> 1. FixtLengthColumnPage // internal data type: byte, short, int, long, >> float, double, dictionary encoding column also should use this class >> 2. StringColumnPage // internal data type: byte[] >> 3. DecimalColumnPage // internal data type: byte[] >> 4. ComplexColumnPage // internal data type: byte[] >> >> If user specify only dictionary column in SORT_COLUMNS and cardinality is >> less, it is possible that we can further optimize the keyPage member as >> long data type (can hold up to 4 columns, each column cardinality less than >> 65535), so that it can be fit in CPU cache and it will be more efficient >> while sorting. >> >> Regards, >> Jacky >> >>> 在 2017年4月27日,上午9:20,Jacky Li <jacky.li...@qq.com> 写道: >>> >>> Hi community, >>> >>> More idea on the loading process, in my opinion the ideal one should be >> as following, please comment: >>> >>> 1. input step: >>> - Do the parsing of input data, either CSV or Dataframe, they all >> convert into CarbonRow. >>> - Buffering them to CarbonRowBatch >>> - Prefetchiong of rows >>> >>> 2. convert step: >>> - Convert fields based on the category of columns >>> - for global dictionary encoded columns, convert them into dictionary >> id. >>> - for primitive type columns, just keep as origin type. (no conversion) >>> - for string and variable length data type columns (like complex >> columns), convert to LV (length-value) encoded byte array >>> >>> - Convert input row to columnar layout as we read from previous step. >> Create a ColumnPage for one column, and a DataPage for all columns. >>> >>> - The DataPage should store the key columns specified by SORT_COLUMNS >> together in one vector, so that it will be efficient for CPU cache while >> sorting >>> >>> 3. index-build step: >>> - Based on configuration, there can be index-build step or without this >> step if SORT_COLUMNS is empty, which indicating no sort. >>> >>> - Add the DataPage into a Sorter, the sorter sorts the incoming DataPage >> on the fly. The sort result is array of rowId(pageId + offset). Avoid using >> Arrays.sort in JDK. >>> >>> - The sorter should support batch sort or merge sort. For merge sort, >> muliple spilling strategy can be implemented. >>> >>> 4. write step: >>> - Calculate the start/end key of MDKey over the ColumnPage >>> >>> - Calculate the statistics of the ColumnPage. Currently in carbondata >> min/max stats is hard coded. As a first step of extension, we want to >> provide more stats like histogram and bloom filter so that user can choose >> from. Optional field need to be added in thrift definition in DataChunk >> struct >>> >>> - Encode and compress each column into byte array. To make encoding more >> trasparent to the user and extensible, we should first decouple the data >> type with the available codec options. So in the code can choose codec for >> specific data type by some strategy. For the strategy, we can firstly have >> a default one as beginning, later on we can open this interface for >> developer to extension in the future. (say, developer can choose codec >> based on some hueristic rules based on the data distribution) >>> >>> - After each column is encoded and compressed, follow the current >> Producer-Consumer logic to write to disk. >>> >>> >>> As the bottlenet will be the merge sort, some thought on the merging >> strategy: >>> Merge sort strategy: >>> 1. spill both key columns and non-key columns in sorted order >>> 2. spill key columns in sorted order and non-key columns in original >> order, and spill rowId also. Apply rowId mapping only in merger stage. This >> can avoid some random memory access on non-key columns. It is good if there >> are many non-key columns >>> 3. do not spill key columns, and spill non-key columns in origin order. >> This is good as we can keep more key columns in memory >>> >>> >>> Regards, >>> Jacky >>> >>> >>>> 在 2017年4月22日,下午8:08,Liang Chen <chenliang6...@gmail.com> 写道: >>>> >>>> Jacky, thank you list these constructive improvements of data loading. >>>> >>>> Agree to consider all these improvement points, only the below one i >> have >>>> some concerns. >>>> Before considering open interfaces for data loading, we need to more >>>> clearly define block/blocklet/page which play what different roles, >> then we >>>> could consider some interfaces for block level, some interfaces for >>>> blocklet level, some for page level. >>>> >>>> Let us take up these improvements in the coming release. >>>> ------------------------------------------------------------ >> -------------------------------------------------------- >>>> improvement 3: we want to open up some interfaces for letting developer >> to >>>> add more page encoding, statistics, page compression. These interface >> will >>>> be like callbacks, so developer can write new encoding/statistics/ >> compression >>>> method and carbon loading process will invoke it in this step. This >>>> interface will be like: >>>> >>>> Regards >>>> Liang >>>> >>>> 2017-04-21 14:50 GMT+08:00 Jacky Li <jacky.li...@qq.com>: >>>> >>>>> I want to propose following improvement on data loading process: >>>>> >>>>> Currently different steps are using different data layout in CarbonRow, >>>>> and it convert back and forth in different steps. It is not easy for >>>>> developer to understand the data structure used in each steps and it >>>>> increase the memory requirement as it is doing unnecessary data >> copying in >>>>> some steps. So, suggest to improve it as following >>>>> >>>>> 1) input step: read input and create a CarbonRow with all fields are >>>>> string type >>>>> >>>>> 2) convert step: convert string to byte[] according to its data type, >>>>> this step has compression effect of the input row so it is good for >> saving >>>>> memory and it also take cares of the null value >>>>> if it is dictionary dimension then convert to surrogate key; >>>>> if it is no dictionary then convert to byte[] representation; >>>>> if it is complex dimension, then convert to byte[] >> representation; >>>>> if it is measure then convert to Object, like Integer, Long, >>>>> Double, according to schema —> change to byte[] instead of storing >> Object >>>>> to avoid boxing/unboxing and save memory >>>>> >>>>> The conversion is happened by updating the field in CarbonRow in >>>>> place, there should be no new CarbonRow created, however, there is a >> copy >>>>> operation of the input CarbonRow, for bad record handling —> do not >> copy >>>>> the row, convert it back to value if it is bad record >>>>> >>>>> 3) sort step: >>>>> improvement 1: sort the collected input CarbonRow. Currently this >> is >>>>> done by copying the row object into internal buffer and sort is done >> on the >>>>> buffer. —> to avoid the copying of the CarbonRow, we should create this >>>>> buffer (with RowID) in input step, and only output the sorted RowID (by >>>>> swapping its value in the RowID array) according to its value. If it >> is a >>>>> merge sort, then write to file based on this sorted RowID array when >>>>> spilling to disk. So no copy of CarbonRow is required. >>>>> >>>>> improvement 2: when spilling to disk, currently it changes the field >>>>> order in CarbonRow, it is writing as a 3-elements array, [global >> dictionary >>>>> dimension, plain dimension and complex dimension, measure columns] , >> this >>>>> is because the merger is expecting it like this —> I think this is >>>>> unnecessary, we can add serialization/deserialization capability in >>>>> CarbonRow and use CarbonRow instead. In the case of no-sort table, it >> also >>>>> avoid this conversion in write step. >>>>> >>>>> 4) write step: >>>>> currently it will collect one page of data (32K rows) and start a >>>>> Producer which actually is the encode process of one page. In order to >>>>> support parallel processing, after the page data is encoded then put >> it to >>>>> a queue which will be taken by the Consumer, the Consumer will collect >>>>> pages up to one blocklet size (configurable, say 64MB), and write to >>>>> CarbonData files. >>>>> >>>>> improvement 1: there is an unnecessary data copy and re-ordering of >>>>> the fields of the row. it converts the row to: [measure columns, plain >>>>> dimension and complex dimension, global dictionary dimension] it is >>>>> different from what sort step outputs. —> so suggest to use CarbonRow >> only. >>>>> no new row object should be created here. >>>>> >>>>> improvement 2: there are multiple traversal of the page data in the >>>>> code currently —> we should change to, firstly convert the CarbonRow to >>>>> ColumnarPage which is the columnar representation for all columns in >> one >>>>> page, and collect the start/end key and statistics when doing this >> columnar >>>>> conversion. Then apply inverted index, RLE, compression process based >> on >>>>> ColumnarPage object. >>>>> >>>>> improvement 3: we want to open up some interfaces for letting >>>>> developer to add more page encoding, statistics, page compression. >> These >>>>> interface will be like callbacks, so developer can write new >>>>> encoding/statistics/compression method and carbon loading process will >>>>> invoke it in this step. This interface will be like: >>>>> >>>>> /** >>>>> * Codec for a column page, implementation should not keep state across >>>>> pages, >>>>> * caller will use the same object to encode multiple pages >>>>> */ >>>>> interface PageCodec { >>>>> /** Codec name will be stored in BlockletHeader (DataChunk3) */ >>>>> String getName(); >>>>> void startPage(int pageID); >>>>> void processColumn(ColumnBatch batch); >>>>> byte[] endPage(int pageID); >>>>> ColumnBatch decode(byte[] encoded); >>>>> } >>>>> >>>>> /** Compressor of the page data, the flow is encode->compress, and >>>>> decompress->decode */ >>>>> interface PageCompressor { >>>>> /** Codec name will be stored in BlockletHeader (DataChunk3) */ >>>>> String getName(); >>>>> byte[] compress(byte[] encodedData); >>>>> byte[] decompress(byte[] data); >>>>> } >>>>> >>>>> /** Calculate the statistics for a column page and blocklet */ >>>>> interface Statistics { >>>>> /** Codec name will be stored in BlockletHeader (DataChunk3) */ >>>>> String getName(); >>>>> void startPage(int pageID); >>>>> void endPage(int pageID); >>>>> void startBlocklet(int blockletID); >>>>> void endBlocklet(int blockletID); >>>>> >>>>> /** Update the stats for the input batch */ >>>>> void update(ColumnBatch batch); >>>>> >>>>> /** Ouput will be written to DataChunk2 in BlockletHeader (DataChunk3) >> */ >>>>> int[] getPageStatistisc(); >>>>> >>>>> /** Output will be written to Footer */ >>>>> int[] getBlockletStatistics(); >>>>> } >>>>> >>>>> And, there should be a partition step adding somewhere to support >>>>> partition feature (CARBONDATA-910), and it depends on whether we >> implement >>>>> this partition shuffling in spark layer or carbon layer. (before input >> step >>>>> or after conversion step). What is the current idea of this? @CaiQiang >>>>> @Lionel >>>>> >>>>> What you think about these improvements? >>>>> >>>>> Regards, >>>>> Jacky >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Regards >>>> Liang >>>> >>> >>> >>> >> >> >> >> > > > -- > Thanks & Regards, > Ravi >