Github user QiangCai commented on a diff in the pull request:

    
https://github.com/apache/incubator-carbondata/pull/672#discussion_r107881262
  
    --- Diff: 
integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
 ---
    @@ -0,0 +1,249 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.carbondata.hive;
    +
    +
    +import java.io.IOException;
    +import java.sql.Date;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.carbondata.core.datastore.block.TableBlockInfo;
    +import 
org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
    +import org.apache.carbondata.core.scan.model.QueryModel;
    +import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
    +import org.apache.carbondata.hadoop.CarbonRecordReader;
    +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hive.common.type.HiveDecimal;
    +import org.apache.hadoop.hive.serde.serdeConstants;
    +import org.apache.hadoop.hive.serde2.SerDeException;
    +import org.apache.hadoop.hive.serde2.io.DateWritable;
    +import org.apache.hadoop.hive.serde2.io.DoubleWritable;
    +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
    +import org.apache.hadoop.hive.serde2.io.ShortWritable;
    +import org.apache.hadoop.hive.serde2.io.TimestampWritable;
    +import org.apache.hadoop.hive.serde2.objectinspector.*;
    +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
    +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    +import org.apache.hadoop.io.ArrayWritable;
    +import org.apache.hadoop.io.IntWritable;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.io.Writable;
    +import org.apache.hadoop.mapred.InputSplit;
    +import org.apache.hadoop.mapred.JobConf;
    +
    +public class CarbonHiveRecordReader extends 
CarbonRecordReader<ArrayWritable>
    +    implements org.apache.hadoop.mapred.RecordReader<Void, ArrayWritable> {
    +
    +  ArrayWritable valueObj = null;
    +  private CarbonObjectInspector objInspector;
    +
    +  public CarbonHiveRecordReader(QueryModel queryModel, 
CarbonReadSupport<ArrayWritable> readSupport,
    +                                InputSplit inputSplit, JobConf jobConf) 
throws IOException {
    +    super(queryModel, readSupport);
    +    initialize(inputSplit, jobConf);
    +  }
    +
    +  public void initialize(InputSplit inputSplit, Configuration conf) throws 
IOException {
    +    // The input split can contain single HDFS block or multiple blocks, 
so firstly get all the
    +    // blocks and then set them in the query model.
    +    List<CarbonHiveInputSplit> splitList;
    +    if (inputSplit instanceof CarbonHiveInputSplit) {
    +      splitList = new ArrayList<>(1);
    +      splitList.add((CarbonHiveInputSplit) inputSplit);
    +    } else {
    +      throw new RuntimeException("unsupported input split type: " + 
inputSplit);
    +    }
    +    List<TableBlockInfo> tableBlockInfoList = 
CarbonHiveInputSplit.createBlocks(splitList);
    +    queryModel.setTableBlockInfos(tableBlockInfoList);
    +    readSupport.initialize(queryModel.getProjectionColumns(),
    +        queryModel.getAbsoluteTableIdentifier());
    +    try {
    +      carbonIterator = new 
ChunkRowIterator(queryExecutor.execute(queryModel));
    +    } catch (QueryExecutionException e) {
    +      throw new IOException(e.getMessage(), e.getCause());
    +    }
    +    if (valueObj == null) {
    +      valueObj = new ArrayWritable(Writable.class,
    +          new Writable[queryModel.getProjectionColumns().length]);
    +    }
    +
    +    final TypeInfo rowTypeInfo;
    +    final List<String> columnNames;
    +    List<TypeInfo> columnTypes;
    +    // Get column names and sort order
    +    final String columnNameProperty = 
conf.get("hive.io.file.readcolumn.names");
    +    final String columnTypeProperty = 
conf.get(serdeConstants.LIST_COLUMN_TYPES);
    +
    +    if (columnNameProperty.length() == 0) {
    +      columnNames = new ArrayList<String>();
    +    } else {
    +      columnNames = Arrays.asList(columnNameProperty.split(","));
    +    }
    +    if (columnTypeProperty.length() == 0) {
    +      columnTypes = new ArrayList<TypeInfo>();
    +    } else {
    +      columnTypes = 
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
    +    }
    +    columnTypes = columnTypes.subList(0, columnNames.size());
    +    // Create row related objects
    +    rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, 
columnTypes);
    +    this.objInspector = new CarbonObjectInspector((StructTypeInfo) 
rowTypeInfo);
    +  }
    +
    +  @Override
    +  public boolean next(Void aVoid, ArrayWritable value) throws IOException {
    +    if (carbonIterator.hasNext()) {
    +      Object obj = readSupport.readRow(carbonIterator.next());
    +      ArrayWritable tmpValue = null;
    +      try {
    +        tmpValue = createArrayWritable(obj);
    +      } catch (SerDeException se) {
    +        throw new IOException(se.getMessage(), se.getCause());
    +      }
    +
    +      if (valueObj != tmpValue) {
    --- End diff --
    
    The result of this condition is always true.
    Do you want to check whether all columns are null or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to