Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2869#discussion_r229563643
--- Diff:
hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import
org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import
org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import
org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.hadoop.AbstractRecordReader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+/**
+ * A specialized RecordReader that reads into CarbonColumnarBatches
directly using the
+ * carbondata column APIs and fills the data directly into columns.
+ */
+public class CarbonVectorizedRecordReader extends
AbstractRecordReader<Object> {
+
+ private static final Logger LOGGER =
+
LogServiceFactory.getLogService(CarbonVectorizedRecordReader.class.getName());
+
+ private CarbonColumnarBatch carbonColumnarBatch;
+
+ private QueryExecutor queryExecutor;
+
+ private int batchIdx = 0;
+
+ private int numBatched = 0;
+
+ private AbstractDetailQueryResultIterator iterator;
+
+ private QueryModel queryModel;
+
+ public CarbonVectorizedRecordReader(QueryModel queryModel) {
+ this.queryModel = queryModel;
+ }
+
+ @Override public void initialize(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ List<CarbonInputSplit> splitList;
+ if (inputSplit instanceof CarbonInputSplit) {
+ splitList = new ArrayList<>(1);
+ splitList.add((CarbonInputSplit) inputSplit);
+ } else {
+ throw new RuntimeException("unsupported input split type: " +
inputSplit);
+ }
+ List<TableBlockInfo> tableBlockInfoList =
CarbonInputSplit.createBlocks(splitList);
+ queryModel.setTableBlockInfos(tableBlockInfoList);
+ queryModel.setVectorReader(true);
+ try {
+ queryExecutor =
+ QueryExecutorFactory.getQueryExecutor(queryModel,
taskAttemptContext.getConfiguration());
+ iterator = (AbstractDetailQueryResultIterator)
queryExecutor.execute(queryModel);
+ } catch (QueryExecutionException e) {
+ LOGGER.error(e);
+ throw new InterruptedException(e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error(e);
+ throw e;
+ }
+ }
+
+ @Override public boolean nextKeyValue() throws IOException,
InterruptedException {
+ initBatch();
+ if (batchIdx >= numBatched) {
+ if (!nextBatch()) return false;
+ }
+ ++batchIdx;
+ return true;
+ }
+
+
+ private boolean nextBatch() {
+ carbonColumnarBatch.reset();
+ if (iterator.hasNext()) {
+ iterator.processNextBatch(carbonColumnarBatch);
+ numBatched = carbonColumnarBatch.getActualSize();
+ batchIdx = 0;
+ return true;
+ }
+ return false;
+ }
+
+ private void initBatch() {
+ if (carbonColumnarBatch == null) {
+ List<ProjectionDimension> queryDimension =
queryModel.getProjectionDimensions();
+ List<ProjectionMeasure> queryMeasures =
queryModel.getProjectionMeasures();
+ StructField[] fields = new StructField[queryDimension.size() +
queryMeasures.size()];
+ for (ProjectionDimension dim : queryDimension) {
+ fields[dim.getOrdinal()] =
+ new StructField(dim.getColumnName(),
dim.getDimension().getDataType());
+ }
+ for (ProjectionMeasure msr : queryMeasures) {
+ DataType dataType = msr.getMeasure().getDataType();
+ if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT
+ || dataType == DataTypes.INT || dataType == DataTypes.LONG
+ || dataType == DataTypes.FLOAT || dataType == DataTypes.BYTE) {
+ fields[msr.getOrdinal()] =
+ new StructField(msr.getColumnName(),
msr.getMeasure().getDataType());
+ } else if (DataTypes.isDecimal(dataType)) {
+ fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
+ new DecimalType(msr.getMeasure().getPrecision(),
msr.getMeasure().getScale()));
+ } else {
+ fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
DataTypes.DOUBLE);
+ }
+ }
+ CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
+ for (int i = 0; i < fields.length; i++) {
+ vectors[i] = new CarbonColumnVectorImpl(
+
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
+ fields[i].getDataType());
+ }
+ carbonColumnarBatch = new CarbonColumnarBatch(vectors,
+
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
+ new boolean[] {});
+ }
+ }
+
+ @Override
+ public Object getCurrentValue() throws IOException, InterruptedException
{
+ rowCount += 1;
+ Object[] row = new Object[carbonColumnarBatch.columnVectors.length];
+ for (int i = 0; i < carbonColumnarBatch.columnVectors.length; i ++) {
+ if (carbonColumnarBatch.columnVectors[i].getType() ==
DataTypes.STRING
+ || carbonColumnarBatch.columnVectors[i].getType() ==
DataTypes.VARCHAR) {
+ byte[] data = (byte[])
carbonColumnarBatch.columnVectors[i].getData(batchIdx - 1);
+ row[i] = ByteUtil.toString(data, 0, data.length);
+ } else {
+ row[i] = carbonColumnarBatch.columnVectors[i].getData(batchIdx -
1);
+ }
+ }
+ return row;
+ }
+
+ @Override public Void getCurrentKey() throws IOException,
InterruptedException {
+ return null;
--- End diff --
VectorizedCarbonRecordReader is handled in the same way
---