Github user akashrn5 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2402#discussion_r197603140
--- Diff:
core/src/main/java/org/apache/carbondata/core/datastore/blocklet/BlockletEncodedColumnPage.java
---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.blocklet;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.page.FallbackColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.FallbackEncodedColumnPage;
+import
org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
+import org.apache.carbondata.core.localdictionary.PageLevelDictionary;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.format.LocalDictionaryChunk;
+
+/**
+ * Maintains the list of encoded page of a column in a blocklet
+ * and encoded dictionary values only if column is encoded using local
+ * dictionary
+ * Handle the fallback if all the pages in blocklet are not
+ * encoded with local dictionary
+ */
+public class BlockletEncodedColumnPage {
+
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+
LogServiceFactory.getLogService(BlockletEncodedColumnPage.class.getName());
+
+ /**
+ * list of encoded page of a column in a blocklet
+ */
+ private List<EncodedColumnPage> encodedColumnPageList;
+
+ /**
+ * fallback executor service
+ */
+ private ExecutorService fallbackExecutorService;
+
+ /**
+ * to check whether pages are local dictionary encoded or not
+ */
+ private boolean isLocalDictEncoded;
+
+ /**
+ * page level dictionary only when column is encoded with local
dictionary
+ */
+ private PageLevelDictionary pageLevelDictionary;
+
+ /**
+ * fallback future task queue;
+ */
+ private ArrayDeque<Future<FallbackEncodedColumnPage>>
fallbackFutureQueue;
+
+ BlockletEncodedColumnPage(ExecutorService fallbackExecutorService,
+ EncodedColumnPage encodedColumnPage) {
+ this.encodedColumnPageList = new ArrayList<>();
+ this.fallbackExecutorService = fallbackExecutorService;
+ this.encodedColumnPageList.add(encodedColumnPage);
+ // if dimension page is local dictionary enabled and encoded with
local dictionary
+ if (encodedColumnPage.isLocalDictionaryEnabled() && encodedColumnPage
+ .isLocalDictGeneratedPage()) {
+ this.isLocalDictEncoded = true;
+ // get first page dictionary
+ this.pageLevelDictionary = encodedColumnPage.getPageDictionary();
+ }
+ }
+
+ /**
+ * Below method will be used to add column page of a column
+ *
+ * @param encodedColumnPage
+ * encoded column page
+ * @throws ExecutionException
+ * failure in fallback
+ * @throws InterruptedException
+ * failure during fallback
+ */
+ void addEncodedColumnTable(EncodedColumnPage encodedColumnPage)
+ throws ExecutionException, InterruptedException {
+ // if local dictionary is false or column is encoded with local
dictionary then
+ // add a page
+ if (!isLocalDictEncoded ||
encodedColumnPage.isLocalDictGeneratedPage()) {
+ this.encodedColumnPageList.add(encodedColumnPage);
+ // merge page level dictionary values
+ if (null != this.pageLevelDictionary) {
+
pageLevelDictionary.mergerDictionaryValues(encodedColumnPage.getPageDictionary());
+ }
+ } else {
+ // if older pages where encoded with dictionary and new pages are
with dictionary
+ isLocalDictEncoded = false;
+ pageLevelDictionary = null;
+ this.fallbackFutureQueue = new ArrayDeque<>();
+ LOGGER.info(
+ "Local dictionary Fallback is initiated for column: " +
encodedColumnPageList.get(0)
+ .getActualPage().getColumnSpec().getFieldName());
+ // submit all the older pages encoded with dictionary for fallback
+ for (int pageIndex = 0; pageIndex < encodedColumnPageList.size();
pageIndex++) {
+ fallbackFutureQueue.add(fallbackExecutorService.submit(
+ new
FallbackColumnPageEncoder(encodedColumnPageList.get(pageIndex), pageIndex)));
+ }
+ //add to page list
+ this.encodedColumnPageList.add(encodedColumnPage);
+ }
+ }
+
+ /**
+ * Return the list of encoded page list for a column in a blocklet
+ *
+ * @return list of encoded page list
+ */
+ public List<EncodedColumnPage> getEncodedColumnPageList() {
+ // if fall back queue is empty then for some pages fallback was
initiated
--- End diff --
correct the comment
---