Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r229563650
--- Diff:
hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import
org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import
org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import
org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.hadoop.AbstractRecordReader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+/**
+ * A specialized RecordReader that reads into CarbonColumnarBatches
directly using the
+ * carbondata column APIs and fills the data directly into columns.
+ */
+public class CarbonVectorizedRecordReader extends
AbstractRecordReader<Object> {
+
+ private static final Logger LOGGER =
+
LogServiceFactory.getLogService(CarbonVectorizedRecordReader.class.getName());
+
+ private CarbonColumnarBatch carbonColumnarBatch;
+
+ private QueryExecutor queryExecutor;
+
+ private int batchIdx = 0;
+
+ private int numBatched = 0;
+
+ private AbstractDetailQueryResultIterator iterator;
+
+ private QueryModel queryModel;
+
+ public CarbonVectorizedRecordReader(QueryModel queryModel) {
+ this.queryModel = queryModel;
+ }
+
+ @Override public void initialize(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ List<CarbonInputSplit> splitList;
+ if (inputSplit instanceof CarbonInputSplit) {
+ splitList = new ArrayList<>(1);
+ splitList.add((CarbonInputSplit) inputSplit);
+ } else {
+ throw new RuntimeException("unsupported input split type: " +
inputSplit);
+ }
+ List<TableBlockInfo> tableBlockInfoList =
CarbonInputSplit.createBlocks(splitList);
+ queryModel.setTableBlockInfos(tableBlockInfoList);
+ queryModel.setVectorReader(true);
+ try {
+ queryExecutor =
+ QueryExecutorFactory.getQueryExecutor(queryModel,
taskAttemptContext.getConfiguration());
+ iterator = (AbstractDetailQueryResultIterator)
queryExecutor.execute(queryModel);
+ } catch (QueryExecutionException e) {
+ LOGGER.error(e);
+ throw new InterruptedException(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error(e);
+ throw e;
+ }
+ }
+
+ @Override public boolean nextKeyValue() throws IOException,
InterruptedException {
+ initBatch();
--- End diff --
done
---