Github user akashrn5 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2402#discussion_r197603140 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java --- @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datastore.blocklet; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.page.FallbackColumnPageEncoder; +import org.apache.carbondata.core.datastore.page.FallbackEncodedColumnPage; +import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage; +import org.apache.carbondata.core.localdictionary.PageLevelDictionary; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.format.LocalDictionaryChunk; + +/** + * Maintains the list of encoded page of a column in a blocklet + * and encoded dictionary values only if column is encoded using local + * dictionary + * Handle the fallback if all the pages in blocklet are not + * encoded with local dictionary + */ +public class BlockletEncodedColumnPage { + + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(BlockletEncodedColumnPage.class.getName()); + + /** + * list of encoded page of a column in a blocklet + */ + private List<EncodedColumnPage> encodedColumnPageList; + + /** + * fallback executor service + */ + private ExecutorService fallbackExecutorService; + + /** + * to check whether pages are local dictionary encoded or not + */ + private boolean isLocalDictEncoded; + + /** + * page level dictionary only when column is encoded with local dictionary + */ + private PageLevelDictionary pageLevelDictionary; + + /** + * fallback future task queue; + */ + private ArrayDeque<Future<FallbackEncodedColumnPage>> fallbackFutureQueue; + + BlockletEncodedColumnPage(ExecutorService fallbackExecutorService, + EncodedColumnPage encodedColumnPage) { + this.encodedColumnPageList = new ArrayList<>(); + this.fallbackExecutorService = fallbackExecutorService; + this.encodedColumnPageList.add(encodedColumnPage); + // if dimension page is local dictionary enabled and encoded with local dictionary + if (encodedColumnPage.isLocalDictionaryEnabled() && encodedColumnPage + .isLocalDictGeneratedPage()) { + this.isLocalDictEncoded = true; + // get first page dictionary + this.pageLevelDictionary = encodedColumnPage.getPageDictionary(); + } + } + + /** + * Below method will be used to add column page of a column + * + * @param encodedColumnPage + * encoded column page + * @throws ExecutionException + * failure in fallback + * @throws InterruptedException + * failure during fallback + */ + void addEncodedColumnTable(EncodedColumnPage encodedColumnPage) + throws ExecutionException, InterruptedException { + // if local dictionary is false or column is encoded with local dictionary then + // add a page + if (!isLocalDictEncoded || encodedColumnPage.isLocalDictGeneratedPage()) { + this.encodedColumnPageList.add(encodedColumnPage); + // merge page level dictionary values + if (null != this.pageLevelDictionary) { + pageLevelDictionary.mergerDictionaryValues(encodedColumnPage.getPageDictionary()); + } + } else { + // if older pages where encoded with dictionary and new pages are with dictionary + isLocalDictEncoded = false; + pageLevelDictionary = null; + this.fallbackFutureQueue = new ArrayDeque<>(); + LOGGER.info( + "Local dictionary Fallback is initiated for column: " + encodedColumnPageList.get(0) + .getActualPage().getColumnSpec().getFieldName()); + // submit all the older pages encoded with dictionary for fallback + for (int pageIndex = 0; pageIndex < encodedColumnPageList.size(); pageIndex++) { + fallbackFutureQueue.add(fallbackExecutorService.submit( + new FallbackColumnPageEncoder(encodedColumnPageList.get(pageIndex), pageIndex))); + } + //add to page list + this.encodedColumnPageList.add(encodedColumnPage); + } + } + + /** + * Return the list of encoded page list for a column in a blocklet + * + * @return list of encoded page list + */ + public List<EncodedColumnPage> getEncodedColumnPageList() { + // if fall back queue is empty then for some pages fallback was initiated --- End diff -- correct the comment
---