Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2374#discussion_r195687377
  
    --- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonCsvRecordReader.java ---
    @@ -0,0 +1,663 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.hadoop;
    +
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.io.Reader;
    +import java.math.BigInteger;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.carbondata.common.logging.LogService;
    +import org.apache.carbondata.common.logging.LogServiceFactory;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.datastore.block.SegmentProperties;
    +import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
    +import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
    +import org.apache.carbondata.core.metadata.datatype.DataType;
    +import org.apache.carbondata.core.metadata.encoder.Encoding;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
    +import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
    +import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
    +import 
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
    +import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
    +import org.apache.carbondata.core.scan.filter.FilterUtil;
    +import org.apache.carbondata.core.scan.filter.GenericQueryType;
    +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
    +import org.apache.carbondata.core.scan.filter.intf.RowImpl;
    +import org.apache.carbondata.core.scan.filter.intf.RowIntf;
    +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
    +import org.apache.carbondata.core.scan.model.QueryModel;
    +import org.apache.carbondata.core.statusmanager.FileFormatProperties;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
    +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
    +
    +import com.univocity.parsers.csv.CsvParser;
    +import com.univocity.parsers.csv.CsvParserSettings;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.spark.memory.MemoryMode;
    +import org.apache.spark.sql.catalyst.InternalRow;
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
    +import org.apache.spark.sql.execution.vectorized.ColumnVector;
    +import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
    +import org.apache.spark.sql.types.CalendarIntervalType;
    +import org.apache.spark.sql.types.DataTypes;
    +import org.apache.spark.sql.types.Decimal;
    +import org.apache.spark.sql.types.DecimalType;
    +import org.apache.spark.sql.types.StructField;
    +import org.apache.spark.sql.types.StructType;
    +import org.apache.spark.unsafe.types.CalendarInterval;
    +import org.apache.spark.unsafe.types.UTF8String;
    +
    +/**
    + * scan csv file and filter on it
    + */
    +public class CarbonCsvRecordReader<T> extends AbstractRecordReader<T> {
    +  private static final LogService LOGGER = LogServiceFactory.getLogService(
    +      CarbonCsvRecordReader.class.getName());
    +  private static final int MAX_BATCH_SIZE = 32000;
    +
    +  // vector reader
    +  private boolean isVectorReader;
    +  private ColumnarBatch columnarBatch;
    +  private StructType outputSchema;
    +
    +  // metadata
    +  private CarbonTable carbonTable;
    +  private CarbonColumn[] carbonColumns;
    +  // input
    +  private QueryModel queryModel;
    +  private FileSplit fileSplit;
    +  private Configuration hadoopConf;
    +  // the index is schema ordinal, the value is the csv ordinal
    +  private int[] schema2csvIdx;
    +
    +  // filter
    +  private FilterExecuter filter;
    +  // the index is the dimension ordinal, the value is the schema ordinal
    +  private int[] filterColumn2SchemaIdx;
    +  private Object[] internalValues;
    +  private RowIntf internalRow;
    +
    +  // output
    +  private CarbonColumn[] projection;
    +  // the index is the projection column ordinal, the value is the schema 
ordinal
    +  private int[] projectionColumn2SchemaIdx;
    +  private Object[] outputValues;
    +  private Object[] finalOutputValues;
    +  private InternalRow outputRow;
    +
    +  // inputMetricsStats
    +  private InputMetricsStats inputMetricsStats;
    +
    +  // scan
    +  private Reader reader;
    +  private CsvParser csvParser;
    +
    +  public CarbonCsvRecordReader(QueryModel queryModel) {
    +    this.queryModel = queryModel;
    +  }
    +
    +  public CarbonCsvRecordReader(QueryModel queryModel, InputMetricsStats 
inputMetricsStats) {
    +    this(queryModel);
    +    this.inputMetricsStats = inputMetricsStats;
    +  }
    +
    +  public boolean isVectorReader() {
    +    return isVectorReader;
    +  }
    +
    +  public void setVectorReader(boolean vectorReader) {
    +    isVectorReader = vectorReader;
    +  }
    +
    +  public void setQueryModel(QueryModel queryModel) {
    +    this.queryModel = queryModel;
    +  }
    +
    +  public void setInputMetricsStats(InputMetricsStats inputMetricsStats) {
    +    this.inputMetricsStats = inputMetricsStats;
    +  }
    +
    +  @Override
    +  public void initialize(InputSplit split, TaskAttemptContext context)
    +      throws IOException, InterruptedException {
    +    if (split instanceof CarbonInputSplit) {
    +      fileSplit = (CarbonInputSplit) split;
    +    } else if (split instanceof CarbonMultiBlockSplit) {
    +      fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
    +    } else {
    +      fileSplit = (FileSplit) split;
    +    }
    +
    +    hadoopConf = context.getConfiguration();
    +    if (queryModel == null) {
    +      CarbonTableInputFormat inputFormat = new 
CarbonTableInputFormat<Object>();
    +      queryModel = inputFormat.createQueryModel(split, context);
    +    }
    +
    +    carbonTable = queryModel.getTable();
    +
    +    // since the sequence of csv header, schema, carbon internal row, 
projection are different,
    +    // we need to init the column mappings
    +    initializedIdxMapping();
    +
    +    // init filter
    +    if (null != queryModel.getFilterExpressionResolverTree()) {
    +      initializeFilter();
    +    }
    +
    +    // init reading
    +    initializeCsvReader();
    +  }
    +
    +  private void initializedIdxMapping() {
    +    carbonColumns =
    +        
carbonTable.getCreateOrderColumn(carbonTable.getTableName()).toArray(new 
CarbonColumn[0]);
    +    // for schema to csv mapping
    +    schema2csvIdx = new int[carbonColumns.length];
    +    if (!carbonTable.getTableInfo().getFormatProperties().containsKey(
    +        FileFormatProperties.CSV.HEADER)) {
    +      // if no header specified, it means that they are the same
    +      LOGGER.info("no header specified, will take the schema from table as 
header");
    +      for (int i = 0; i < carbonColumns.length; i++) {
    +        schema2csvIdx[i] = i;
    +      }
    +    } else {
    +      String[] csvHeader = 
carbonTable.getTableInfo().getFormatProperties().get(
    +          FileFormatProperties.CSV.HEADER).split(",");
    +      for (int i = 0; i < csvHeader.length; i++) {
    +        boolean found = false;
    +        for (int j = 0; j < carbonColumns.length; j++) {
    +          if 
(StringUtils.strip(csvHeader[i]).equalsIgnoreCase(carbonColumns[j].getColName()))
 {
    +            schema2csvIdx[carbonColumns[j].getSchemaOrdinal()] = i;
    +            found = true;
    +            break;
    +          }
    +        }
    +        if (!found) {
    +          throw new RuntimeException(
    +              String.format("Can not find csv header '%s' in table 
fields", csvHeader[i]));
    +        }
    +      }
    +    }
    +
    +    // for carbon internal row to schema mapping
    +    filterColumn2SchemaIdx = new int[carbonColumns.length];
    +    int filterIdx = 0;
    +    for (CarbonDimension dimension : carbonTable.getDimensions()) {
    +      filterColumn2SchemaIdx[filterIdx++] = dimension.getSchemaOrdinal();
    +    }
    +    for (CarbonMeasure measure : carbonTable.getMeasures()) {
    +      filterColumn2SchemaIdx[filterIdx++] = measure.getSchemaOrdinal();
    +    }
    +
    +    // for projects to schema mapping
    +    projection = queryModel.getProjectionColumns();
    +    projectionColumn2SchemaIdx = new int[projection.length];
    +
    +    for (int i = 0; i < projection.length; i++) {
    +      for (int j = 0; j < carbonColumns.length; j++) {
    +        if 
(projection[i].getColName().equals(carbonColumns[j].getColName())) {
    +          projectionColumn2SchemaIdx[i] = projection[i].getSchemaOrdinal();
    +          break;
    +        }
    +      }
    +    }
    +  }
    +
    +  private void initializeFilter() {
    +    List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
    +        
.getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
    +            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
    +    int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
    +    for (int i = 0; i < dimLensWithComplex.length; i++) {
    +      dimLensWithComplex[i] = Integer.MAX_VALUE;
    +    }
    +
    +    int[] dictionaryColumnCardinality =
    +        CarbonUtil.getFormattedCardinality(dimLensWithComplex, 
wrapperColumnSchemaList);
    +    SegmentProperties segmentProperties =
    +        new SegmentProperties(wrapperColumnSchemaList, 
dictionaryColumnCardinality);
    +    Map<Integer, GenericQueryType> complexDimensionInfoMap = new 
HashMap<>();
    +
    +    FilterResolverIntf resolverIntf = 
queryModel.getFilterExpressionResolverTree();
    +    filter = FilterUtil.getFilterExecuterTree(resolverIntf, 
segmentProperties,
    +        complexDimensionInfoMap);
    +    // for row filter, we need update column index
    +    
FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
    +        carbonTable.getDimensionOrdinalMax());
    +  }
    +
    +  private void initializeCsvReader() throws IOException {
    +    internalValues = new Object[carbonColumns.length];
    +    internalRow = new RowImpl();
    +    internalRow.setValues(internalValues);
    +
    +    outputValues = new Object[projection.length];
    +    finalOutputValues = new Object[projection.length];
    +    outputRow = new GenericInternalRow(outputValues);
    +
    +    Path file = fileSplit.getPath();
    +    FileSystem fs = file.getFileSystem(hadoopConf);
    +    int bufferSize = Integer.parseInt(
    +        hadoopConf.get(CSVInputFormat.READ_BUFFER_SIZE, 
CSVInputFormat.READ_BUFFER_SIZE_DEFAULT));
    +    // note that here we read the whole file, not a split of the file
    +    FSDataInputStream fsStream = fs.open(file, bufferSize);
    +    reader = new InputStreamReader(fsStream, 
CarbonCommonConstants.DEFAULT_CHARSET);
    +    // use default csv settings first, then update it using user specified 
properties later
    +    CsvParserSettings settings = 
CSVInputFormat.extractCsvParserSettings(hadoopConf);
    +    initCsvSettings(settings);
    +    csvParser = new CsvParser(settings);
    +    csvParser.beginParsing(reader);
    +
    +    outputSchema = new StructType(convertCarbonColumnSpark(projection));
    +  }
    +
    +  /**
    +   * update the settings using properties from user
    +   */
    +  private void initCsvSettings(CsvParserSettings settings) {
    +    Map<String, String> csvProperties = 
carbonTable.getTableInfo().getFormatProperties();
    +
    +    if (csvProperties.containsKey(FileFormatProperties.CSV.DELIMITER)) {
    +      settings.getFormat().setDelimiter(
    +          csvProperties.get(FileFormatProperties.CSV.DELIMITER).charAt(0));
    +    }
    +
    +    if (csvProperties.containsKey(FileFormatProperties.CSV.COMMENT)) {
    +      settings.getFormat().setComment(
    +          csvProperties.get(FileFormatProperties.CSV.COMMENT).charAt(0));
    +    }
    +
    +    if (csvProperties.containsKey(FileFormatProperties.CSV.QUOTE)) {
    +      settings.getFormat().setQuote(
    +          csvProperties.get(FileFormatProperties.CSV.QUOTE).charAt(0));
    +    }
    +
    +    if (csvProperties.containsKey(FileFormatProperties.CSV.ESCAPE)) {
    +      settings.getFormat().setQuoteEscape(
    +          csvProperties.get(FileFormatProperties.CSV.ESCAPE).charAt(0));
    +    }
    +
    +    if 
(csvProperties.containsKey(FileFormatProperties.CSV.SKIP_EMPTY_LINE)) {
    +      settings.setSkipEmptyLines(
    +          
Boolean.parseBoolean(csvProperties.get(FileFormatProperties.CSV.SKIP_EMPTY_LINE)));
    +    }
    +  }
    +
    +  private StructField[] convertCarbonColumnSpark(CarbonColumn[] columns) {
    +    StructField[] fields = new StructField[columns.length];
    +    for (int i = 0; i < columns.length; i++) {
    +      CarbonColumn carbonColumn = columns[i];
    +      fields[i] = new StructField(carbonColumn.getColName(),
    +          getSparkType4CarbonColumn(carbonColumn), true, null);
    +    }
    +    return fields;
    +  }
    +
    +  @Override
    +  public boolean nextKeyValue() throws IOException, InterruptedException {
    +    if (isVectorReader) {
    +      return nextColumnarBatch();
    +    }
    +
    +    return nextRow();
    +  }
    +
    +  private org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
    --- End diff --
    
    You can add ReadSupport for upper layer to convert the data type instead of 
adding it here


---

Reply via email to