[CARBONDATA-2613] Support csv based carbon table

1. create csv based carbon table using
CREATE TABLE fact_table (col1 bigint, col2 string, ..., col100 string)
STORED BY 'CarbonData'
TBLPROPERTIES(
  'foramt'='csv',
  'csv.delimiter'=',',
  'csv.header'='col1,col2,col100')

2. Load data to this table using
ALTER TABLE fact_table ADD SEGMENT LOCATION 'path/to/data1'

This closes #2374


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e4bfb570
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e4bfb570
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e4bfb570

Branch: refs/heads/carbonstore
Commit: e4bfb5701b018d742b62e08dedc9760489d7d241
Parents: d0fa523
Author: xuchuanyin <xuchuan...@hust.edu.cn>
Authored: Wed Jun 13 09:03:28 2018 +0800
Committer: QiangCai <qiang...@qq.com>
Committed: Wed Jul 11 16:00:43 2018 +0800

----------------------------------------------------------------------
 .../carbondata/common/annotations/Since.java    |  38 ++
 .../core/constants/CarbonCommonConstants.java   |   4 +
 .../core/metadata/schema/table/TableInfo.java   |  52 ++
 .../core/statusmanager/FileFormat.java          |  10 +-
 .../statusmanager/FileFormatProperties.java     |  32 ++
 .../core/statusmanager/LoadMetadataDetails.java |  16 +-
 .../hadoop/CarbonMultiBlockSplit.java           |   6 +
 .../carbondata/hadoop/CsvRecordReader.java      | 506 +++++++++++++++++++
 .../hadoop/api/CarbonFileInputFormat.java       |  11 +-
 .../hadoop/api/CarbonInputFormat.java           | 120 ++++-
 .../hadoop/api/CarbonTableInputFormat.java      |  12 +-
 .../datawithoutheader_delimiter_separator.csv   |  10 +
 .../CsvBasedCarbonTableSuite.scala              | 244 +++++++++
 .../carbondata/spark/format/CsvReadSupport.java | 107 ++++
 .../spark/format/VectorCsvReadSupport.java      | 130 +++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  77 ++-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   1 +
 .../command/carbonTableSchemaCommon.scala       |  10 +
 .../management/CarbonAddSegmentCommand.scala    | 135 +++++
 .../sql/parser/CarbonSpark2SqlParser.scala      |  13 +-
 20 files changed, 1499 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/common/src/main/java/org/apache/carbondata/common/annotations/Since.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/carbondata/common/annotations/Since.java 
b/common/src/main/java/org/apache/carbondata/common/annotations/Since.java
new file mode 100644
index 0000000..b7e4391
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/annotations/Since.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * The annotation indicates that the version number since a member or a type 
has been present.
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.TYPE, ElementType.METHOD})
+public @interface Since {
+  /**
+   * the value indicating a version number since this member
+   * or type has been present.
+   */
+  String value();
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ff6b358..ffa2e35 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -947,6 +947,8 @@ public final class CarbonCommonConstants {
    */
   public static final String DICTIONARY_PATH = "dictionary_path";
   public static final String SORT_COLUMNS = "sort_columns";
+  // file format for the data files
+  public static final String FORMAT = "format";
   public static final String PARTITION_TYPE = "partition_type";
   public static final String NUM_PARTITIONS = "num_partitions";
   public static final String RANGE_INFO = "range_info";
@@ -969,6 +971,8 @@ public final class CarbonCommonConstants {
   // Flat folder support on table. when it is true all carbondata files store 
directly under table
   // path instead of sub folders.
   public static final String FLAT_FOLDER = "flat_folder";
+  // this will be used in hadoop conf to pass the format type to executor
+  public static final String CARBON_EXTERNAL_FORMAT_CONF_KEY = 
"carbon_external_format_type";
 
   /**
    * 16 mb size

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 38145e5..46328f7 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -25,11 +25,13 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.carbondata.common.annotations.Since;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -89,6 +91,17 @@ public class TableInfo implements Serializable, Writable {
    *
    */
   private boolean isTransactionalTable = true;
+  /**
+   * The format of the fact table.
+   * By default it is carbondata, and we also support other format like CSV
+   */
+  @Since("1.4.1")
+  private String format = "carbondata";
+  /**
+   * properties for the format, such as delimiter/header for csv format
+   */
+  @Since("1.4.1")
+  private Map<String, String> formatProperties;
 
   // this identifier is a lazy field which will be created when it is used 
first time
   private AbsoluteTableIdentifier identifier;
@@ -104,6 +117,7 @@ public class TableInfo implements Serializable, Writable {
 
   public TableInfo() {
     dataMapSchemaList = new ArrayList<>();
+    formatProperties = new HashMap<>();
     isTransactionalTable = true;
   }
 
@@ -196,6 +210,22 @@ public class TableInfo implements Serializable, Writable {
     this.tablePath = tablePath;
   }
 
+  public String getFormat() {
+    return format;
+  }
+
+  public void setFormat(String format) {
+    this.format = format;
+  }
+
+  public Map<String, String> getFormatProperties() {
+    return formatProperties;
+  }
+
+  public void setFormatProperties(Map<String, String> formatProperties) {
+    this.formatProperties = formatProperties;
+  }
+
   public List<DataMapSchema> getDataMapSchemaList() {
     return dataMapSchemaList;
   }
@@ -291,6 +321,17 @@ public class TableInfo implements Serializable, Writable {
       }
     }
     out.writeBoolean(isSchemaModified);
+
+    out.writeUTF(format);
+    boolean isFormatPropertiesExists = null != formatProperties && 
formatProperties.size() > 0;
+    out.writeBoolean(isFormatPropertiesExists);
+    if (isFormatPropertiesExists) {
+      out.writeShort(formatProperties.size());
+      for (Map.Entry<String, String> entry : formatProperties.entrySet()) {
+        out.writeUTF(entry.getKey());
+        out.writeUTF(entry.getValue());
+      }
+    }
   }
 
   @Override public void readFields(DataInput in) throws IOException {
@@ -327,6 +368,17 @@ public class TableInfo implements Serializable, Writable {
       }
     }
     this.isSchemaModified = in.readBoolean();
+
+    this.format = in.readUTF();
+    boolean isFormatPropertiesExists = in.readBoolean();
+    if (isFormatPropertiesExists) {
+      short size = in.readShort();
+      for (int i = 0; i < size; i++) {
+        String key = in.readUTF();
+        String value = in.readUTF();
+        this.formatProperties.put(key, value);
+      }
+    }
   }
 
   public AbsoluteTableIdentifier getOrCreateAbsoluteTableIdentifier() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
index c154c5f..2b61f0d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
@@ -18,7 +18,8 @@
 package org.apache.carbondata.core.statusmanager;
 
 /**
- * The data file format supported in carbondata project
+ * The data file format supported in carbondata project.
+ * The fileformat along with its property will be stored in tableinfo
  */
 public enum FileFormat {
 
@@ -26,7 +27,10 @@ public enum FileFormat {
   COLUMNAR_V3,
 
   // carbondata row file format, optimized for write
-  ROW_V1;
+  ROW_V1,
+
+  // external file format, such as parquet/csv
+  EXTERNAL;
 
   public static FileFormat getByOrdinal(int ordinal) {
     if (ordinal < 0 || ordinal >= FileFormat.values().length) {
@@ -38,6 +42,8 @@ public enum FileFormat {
         return COLUMNAR_V3;
       case 1:
         return ROW_V1;
+      case 2:
+        return EXTERNAL;
     }
 
     return COLUMNAR_V3;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormatProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormatProperties.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormatProperties.java
new file mode 100644
index 0000000..862c36c
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormatProperties.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.statusmanager;
+
+/**
+ * Provides the constant name for the file format properties
+ */
+public class FileFormatProperties {
+  public static class CSV {
+    public static final String HEADER = "csv.header";
+    public static final String DELIMITER = "csv.delimiter";
+    public static final String COMMENT = "csv.comment";
+    public static final String SKIP_EMPTY_LINE = "csv.skipemptyline";
+    public static final String QUOTE = "csv.quote";
+    public static final String ESCAPE = "csv.escape";
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 9dc8fe6..4339e34 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -122,6 +122,11 @@ public class LoadMetadataDetails implements Serializable {
    * the file format of this segment
    */
   private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
+  /**
+   * path of the fact files.
+   * Since format and formatProperties are stored in tableInfo, we do not 
store it in each segment
+   */
+  private String factFilePath;
 
   /**
    * Segment file name where it has the information of partition information.
@@ -429,8 +434,17 @@ public class LoadMetadataDetails implements Serializable {
     this.segmentFile = segmentFile;
   }
 
+  public String getFactFilePath() {
+    return factFilePath;
+  }
+
+  public void setFactFilePath(String factFilePath) {
+    this.factFilePath = factFilePath;
+  }
+
   @Override public String toString() {
     return "LoadMetadataDetails{" + "loadStatus=" + loadStatus + ", 
loadName='" + loadName + '\''
-        + ", loadStartTime='" + loadStartTime + '\'' + ", segmentFile='" + 
segmentFile + '\'' + '}';
+        + ", loadStartTime='" + loadStartTime + '\'' + ", factFilePath='" + 
factFilePath + '\''
+        + ", segmentFile='" + segmentFile + '\'' + '}';
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index 0b991cb..ae11cf2 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -64,12 +64,18 @@ public class CarbonMultiBlockSplit extends InputSplit 
implements Serializable, W
       this.splitList.add((CarbonInputSplit)block);
     }
     this.locations = new String[]{hostname};
+    if (splitList.size() > 0) {
+      this.fileFormat = splitList.get(0).getFileFormat();
+    }
   }
 
   public CarbonMultiBlockSplit(List<CarbonInputSplit> splitList,
       String[] locations) {
     this.splitList = splitList;
     this.locations = locations;
+    if (splitList.size() > 0) {
+      this.fileFormat = splitList.get(0).getFileFormat();
+    }
     calculateLength();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java
new file mode 100644
index 0000000..70f58c3
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.intf.RowImpl;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.statusmanager.FileFormatProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+
+import com.univocity.parsers.csv.CsvParser;
+import com.univocity.parsers.csv.CsvParserSettings;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * scan csv file and filter on it
+ */
+@InterfaceStability.Evolving
+@InterfaceAudience.Internal
+public class CsvRecordReader<T> extends AbstractRecordReader<T> {
+  private static final LogService LOGGER = LogServiceFactory.getLogService(
+      CsvRecordReader.class.getName());
+  private static final int MAX_BATCH_SIZE =
+      
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
+  // vector reader
+  private boolean isVectorReader;
+  private T columnarBatch;
+
+  // metadata
+  private CarbonTable carbonTable;
+  private CarbonColumn[] carbonColumns;
+  // input
+  private QueryModel queryModel;
+  private CarbonReadSupport<T> readSupport;
+  private FileSplit fileSplit;
+  private Configuration hadoopConf;
+  // the index is schema ordinal, the value is the csv ordinal
+  private int[] schema2csvIdx;
+
+  // filter
+  private FilterExecuter filter;
+  // the index is the dimension ordinal, the value is the schema ordinal
+  private int[] filterColumn2SchemaIdx;
+  private Object[] internalValues;
+  private RowIntf internalRow;
+
+  // output
+  private CarbonColumn[] projection;
+  // the index is the projection column ordinal, the value is the schema 
ordinal
+  private int[] projectionColumn2SchemaIdx;
+  private Object[] outputValues;
+  private Object[][] batchOutputValues;
+  private T outputRow;
+
+  // inputMetricsStats
+  private InputMetricsStats inputMetricsStats;
+
+  // scan
+  private Reader reader;
+  private CsvParser csvParser;
+
+  public CsvRecordReader(QueryModel queryModel, CarbonReadSupport<T> 
readSupport) {
+    this.queryModel = queryModel;
+    this.readSupport = readSupport;
+  }
+
+  public CsvRecordReader(QueryModel queryModel, CarbonReadSupport readSupport,
+      InputMetricsStats inputMetricsStats) {
+    this(queryModel, readSupport);
+    this.inputMetricsStats = inputMetricsStats;
+  }
+
+  public boolean isVectorReader() {
+    return isVectorReader;
+  }
+
+  public void setVectorReader(boolean vectorReader) {
+    isVectorReader = vectorReader;
+  }
+
+  public void setQueryModel(QueryModel queryModel) {
+    this.queryModel = queryModel;
+  }
+
+  public void setInputMetricsStats(InputMetricsStats inputMetricsStats) {
+    this.inputMetricsStats = inputMetricsStats;
+  }
+
+  public void setReadSupport(CarbonReadSupport<T> readSupport) {
+    this.readSupport = readSupport;
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    if (split instanceof CarbonInputSplit) {
+      fileSplit = (CarbonInputSplit) split;
+    } else if (split instanceof CarbonMultiBlockSplit) {
+      fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
+    } else {
+      fileSplit = (FileSplit) split;
+    }
+
+    hadoopConf = context.getConfiguration();
+    if (queryModel == null) {
+      CarbonTableInputFormat inputFormat = new 
CarbonTableInputFormat<Object>();
+      queryModel = inputFormat.createQueryModel(split, context);
+    }
+
+    carbonTable = queryModel.getTable();
+
+    // since the sequence of csv header, schema, carbon internal row, 
projection are different,
+    // we need to init the column mappings
+    initializedIdxMapping();
+
+    // init filter
+    if (null != queryModel.getFilterExpressionResolverTree()) {
+      initializeFilter();
+    }
+
+    // init reading
+    initializeCsvReader();
+
+    this.readSupport.initialize(projection, carbonTable);
+  }
+
+  private void initializedIdxMapping() {
+    carbonColumns =
+        
carbonTable.getCreateOrderColumn(carbonTable.getTableName()).toArray(new 
CarbonColumn[0]);
+    // for schema to csv mapping
+    schema2csvIdx = new int[carbonColumns.length];
+    if (!carbonTable.getTableInfo().getFormatProperties().containsKey(
+        FileFormatProperties.CSV.HEADER)) {
+      // if no header specified, it means that they are the same
+      LOGGER.info("no header specified, will take the schema from table as 
header");
+      for (int i = 0; i < carbonColumns.length; i++) {
+        schema2csvIdx[i] = i;
+      }
+    } else {
+      String[] csvHeader = 
carbonTable.getTableInfo().getFormatProperties().get(
+          FileFormatProperties.CSV.HEADER).split(",");
+      for (int i = 0; i < csvHeader.length; i++) {
+        boolean found = false;
+        for (int j = 0; j < carbonColumns.length; j++) {
+          if 
(StringUtils.strip(csvHeader[i]).equalsIgnoreCase(carbonColumns[j].getColName()))
 {
+            schema2csvIdx[carbonColumns[j].getSchemaOrdinal()] = i;
+            found = true;
+            break;
+          }
+        }
+        if (!found) {
+          throw new RuntimeException(
+              String.format("Can not find csv header '%s' in table fields", 
csvHeader[i]));
+        }
+      }
+    }
+
+    // for carbon internal row to schema mapping
+    filterColumn2SchemaIdx = new int[carbonColumns.length];
+    int filterIdx = 0;
+    for (CarbonDimension dimension : carbonTable.getDimensions()) {
+      filterColumn2SchemaIdx[filterIdx++] = dimension.getSchemaOrdinal();
+    }
+    for (CarbonMeasure measure : carbonTable.getMeasures()) {
+      filterColumn2SchemaIdx[filterIdx++] = measure.getSchemaOrdinal();
+    }
+
+    // for projects to schema mapping
+    projection = queryModel.getProjectionColumns();
+    projectionColumn2SchemaIdx = new int[projection.length];
+
+    for (int i = 0; i < projection.length; i++) {
+      for (int j = 0; j < carbonColumns.length; j++) {
+        if (projection[i].getColName().equals(carbonColumns[j].getColName())) {
+          projectionColumn2SchemaIdx[i] = projection[i].getSchemaOrdinal();
+          break;
+        }
+      }
+    }
+  }
+
+  private void initializeFilter() {
+    List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
+        
.getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
+            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
+    int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
+    for (int i = 0; i < dimLensWithComplex.length; i++) {
+      dimLensWithComplex[i] = Integer.MAX_VALUE;
+    }
+
+    int[] dictionaryColumnCardinality =
+        CarbonUtil.getFormattedCardinality(dimLensWithComplex, 
wrapperColumnSchemaList);
+    SegmentProperties segmentProperties =
+        new SegmentProperties(wrapperColumnSchemaList, 
dictionaryColumnCardinality);
+    Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
+
+    FilterResolverIntf resolverIntf = 
queryModel.getFilterExpressionResolverTree();
+    filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties,
+        complexDimensionInfoMap);
+    // for row filter, we need update column index
+    
FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
+        carbonTable.getDimensionOrdinalMax());
+  }
+
+  private void initializeCsvReader() throws IOException {
+    internalValues = new Object[carbonColumns.length];
+    internalRow = new RowImpl();
+    internalRow.setValues(internalValues);
+
+    outputValues = new Object[projection.length];
+    batchOutputValues = new Object[MAX_BATCH_SIZE][projection.length];
+
+    Path file = fileSplit.getPath();
+    FileSystem fs = file.getFileSystem(hadoopConf);
+    int bufferSize = Integer.parseInt(
+        hadoopConf.get(CSVInputFormat.READ_BUFFER_SIZE, 
CSVInputFormat.READ_BUFFER_SIZE_DEFAULT));
+    // note that here we read the whole file, not a split of the file
+    FSDataInputStream fsStream = fs.open(file, bufferSize);
+    reader = new InputStreamReader(fsStream, 
CarbonCommonConstants.DEFAULT_CHARSET);
+    // use default csv settings first, then update it using user specified 
properties later
+    CsvParserSettings settings = 
CSVInputFormat.extractCsvParserSettings(hadoopConf);
+    initCsvSettings(settings);
+    csvParser = new CsvParser(settings);
+    csvParser.beginParsing(reader);
+  }
+
+  /**
+   * update the settings using properties from user
+   */
+  private void initCsvSettings(CsvParserSettings settings) {
+    Map<String, String> csvProperties = 
carbonTable.getTableInfo().getFormatProperties();
+
+    if (csvProperties.containsKey(FileFormatProperties.CSV.DELIMITER)) {
+      settings.getFormat().setDelimiter(
+          csvProperties.get(FileFormatProperties.CSV.DELIMITER).charAt(0));
+    }
+
+    if (csvProperties.containsKey(FileFormatProperties.CSV.COMMENT)) {
+      settings.getFormat().setComment(
+          csvProperties.get(FileFormatProperties.CSV.COMMENT).charAt(0));
+    }
+
+    if (csvProperties.containsKey(FileFormatProperties.CSV.QUOTE)) {
+      settings.getFormat().setQuote(
+          csvProperties.get(FileFormatProperties.CSV.QUOTE).charAt(0));
+    }
+
+    if (csvProperties.containsKey(FileFormatProperties.CSV.ESCAPE)) {
+      settings.getFormat().setQuoteEscape(
+          csvProperties.get(FileFormatProperties.CSV.ESCAPE).charAt(0));
+    }
+
+    if (csvProperties.containsKey(FileFormatProperties.CSV.SKIP_EMPTY_LINE)) {
+      settings.setSkipEmptyLines(
+          
Boolean.parseBoolean(csvProperties.get(FileFormatProperties.CSV.SKIP_EMPTY_LINE)));
+    }
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (isVectorReader) {
+      return nextColumnarBatch();
+    }
+
+    return nextRow();
+  }
+
+  private boolean nextColumnarBatch() throws IOException {
+    return scanAndFillBatch();
+  }
+
+  private boolean scanAndFillBatch() throws IOException {
+    int rowNum = 0;
+    if (null == filter) {
+      while (readRowFromFile() && rowNum < MAX_BATCH_SIZE) {
+        System.arraycopy(outputValues, 0, batchOutputValues[rowNum++], 0, 
outputValues.length);
+      }
+    } else {
+      try {
+        while (readRowFromFile() && rowNum < MAX_BATCH_SIZE) {
+          if (filter.applyFilter(internalRow, 
carbonTable.getDimensionOrdinalMax())) {
+            System.arraycopy(outputValues, 0, batchOutputValues[rowNum++], 0, 
outputValues.length);
+          }
+        }
+      } catch (FilterUnsupportedException e) {
+        throw new IOException("Failed to filter row in CarbonCsvRecordReader", 
e);
+      }
+    }
+    if (rowNum < MAX_BATCH_SIZE) {
+      Object[][] tmpBatchOutputValues = new Object[rowNum][];
+      for (int i = 0; i < rowNum; i++) {
+        tmpBatchOutputValues[i] = batchOutputValues[i];
+      }
+      System.arraycopy(batchOutputValues, 0, tmpBatchOutputValues, 0, rowNum);
+      for (int i = 0; i < tmpBatchOutputValues.length; i++) {
+      }
+      columnarBatch = readSupport.readRow(tmpBatchOutputValues);
+    } else {
+      columnarBatch = readSupport.readRow(batchOutputValues);
+    }
+    return rowNum > 0;
+  }
+
+  private boolean nextRow() throws IOException {
+    if (csvParser == null) {
+      return false;
+    }
+
+    if (!readRowFromFile()) {
+      return false;
+    }
+
+    if (null == filter) {
+      outputRow = readSupport.readRow(outputValues);
+      return true;
+    } else {
+      try {
+        boolean scanMore;
+        do {
+          scanMore = !filter.applyFilter(internalRow, 
carbonTable.getDimensionOrdinalMax());
+          if (!scanMore) {
+            outputRow = readSupport.readRow(outputValues);
+            return true;
+          }
+        } while (readRowFromFile());
+        // if we read the end of file and still need scanMore, it means that 
there is no row
+        return false;
+      } catch (FilterUnsupportedException e) {
+        throw new IOException("Failed to filter row in CarbonCsvRecordReader", 
e);
+      }
+    }
+  }
+
+  /**
+   * read from csv file and convert to internal row
+   * todo: prune with project/filter
+   * @return false, if it comes to an end
+   */
+  private boolean readRowFromFile() {
+    String[] parsedOut = csvParser.parseNext();
+    if (parsedOut == null) {
+      return false;
+    } else {
+      convertToInternalRow(parsedOut);
+      convertToOutputRow(parsedOut);
+      return true;
+    }
+  }
+
+  /**
+   * convert origin csv string row to carbondata internal row.
+   * The row will be used to do filter on it. Note that the dimensions are at 
the head
+   * while measures are at the end, so we need to adjust the values.
+   */
+  private void convertToInternalRow(String[] csvLine) {
+    try {
+      for (int i = 0; i < carbonColumns.length; i++) {
+        internalValues[i] = convertOriginValue2Carbon(
+            csvLine[schema2csvIdx[filterColumn2SchemaIdx[i]]],
+            carbonColumns[filterColumn2SchemaIdx[i]].getDataType());
+      }
+    } catch (UnsupportedEncodingException e) {
+      LOGGER.error(e, "Error occurs while convert input to internal row");
+      throw new RuntimeException(e);
+    }
+    internalRow.setValues(internalValues);
+  }
+
+  /**
+   * Since output the sequence of columns is not the same as input, we need to 
adjust them
+   */
+  private void convertToOutputRow(String[] csvLine) {
+    for (int i = 0; i < projection.length; i++) {
+      outputValues[i] = csvLine[schema2csvIdx[projectionColumn2SchemaIdx[i]]];
+    }
+  }
+
+  private Object convertOriginValue2Carbon(String value,
+      org.apache.carbondata.core.metadata.datatype.DataType t) throws 
UnsupportedEncodingException {
+    if (null == value) {
+      return null;
+    } else {
+      if (t == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) 
{
+        return Boolean.parseBoolean(value);
+      } else if (t == 
org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE) {
+        return Byte.parseByte(value);
+      } else if (t == 
org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) {
+        return Short.parseShort(value);
+      } else if (t == 
org.apache.carbondata.core.metadata.datatype.DataTypes.INT) {
+        return Integer.parseInt(value);
+      } else if (t == 
org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
+        return Long.parseLong(value);
+      } else if (t == 
org.apache.carbondata.core.metadata.datatype.DataTypes.FLOAT) {
+        return Float.parseFloat(value);
+      } else if (t == 
org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) {
+        return Double.parseDouble(value);
+      } else if (t == 
org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
+        return value.getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
+      } else if 
(org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(t)) {
+        BigDecimal javaDecimal = new BigDecimal(value);
+        return DataTypeUtil.bigDecimalToByte(javaDecimal);
+      } else if (t == 
org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
+        return Integer.parseInt(value);
+      } else if (t == 
org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) {
+        return Long.parseLong(value);
+      } else {
+        throw new RuntimeException("Unsupport datatype in 
CarbonCsvRecordReader");
+      }
+    }
+  }
+
+  @Override
+  public Void getCurrentKey() throws IOException, InterruptedException {
+    return null;
+  }
+
+  @Override
+  public T getCurrentValue() throws IOException, InterruptedException {
+    if (isVectorReader) {
+      if (inputMetricsStats != null) {
+        inputMetricsStats.incrementRecordRead(1L);
+      }
+      return (T) columnarBatch;
+    } else {
+      if (inputMetricsStats != null) {
+        inputMetricsStats.incrementRecordRead(1L);
+      }
+      return (T) outputRow;
+    }
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return 0;
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      if (reader != null) {
+        reader.close();
+      }
+      if (csvParser != null) {
+        csvParser.stopParsing();
+      }
+      if (readSupport != null) {
+        readSupport.close();
+      }
+    } finally {
+      reader = null;
+      csvParser = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 8755176..5b68993 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -174,9 +174,14 @@ public class CarbonFileInputFormat<T> extends 
CarbonInputFormat<T> implements Se
     List<InputSplit> result = new LinkedList<InputSplit>();
 
     // for each segment fetch blocks matching filter in Driver BTree
-    List<CarbonInputSplit> dataBlocksOfSegment =
-        getDataBlocksOfSegment(job, carbonTable, filterResolver, 
matchedPartitions,
-            validSegments, partitionInfo, oldPartitionIdList);
+    List<CarbonInputSplit> dataBlocksOfSegment;
+    if (carbonTable.getTableInfo().getFormat().equals("carbondata")) {
+      dataBlocksOfSegment = getDataBlocksOfSegment(job, carbonTable, 
filterResolver,
+          matchedPartitions, validSegments, partitionInfo, oldPartitionIdList);
+    } else {
+      dataBlocksOfSegment = getDataBlocksOfSegment4ExternalFormat(job, 
carbonTable, filterResolver,
+          validSegments);
+    }
     numBlocks = dataBlocksOfSegment.size();
     result.addAll(dataBlocksOfSegment);
     return result;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 3688026..1c4cc79 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.DataMapJob;
 import org.apache.carbondata.core.datamap.DataMapUtil;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
@@ -52,6 +53,9 @@ import 
org.apache.carbondata.core.scan.model.QueryModelBuilder;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -63,12 +67,15 @@ import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
 import org.apache.carbondata.hadoop.CarbonProjection;
 import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.CsvRecordReader;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 import 
org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -357,6 +364,31 @@ m filterExpression
     }
   }
 
+  protected List<CarbonInputSplit> 
getDataBlocksOfSegment4ExternalFormat(JobContext job,
+      CarbonTable carbonTable, FilterResolverIntf resolver, List<Segment> 
segmentIds)
+      throws IOException {
+
+    QueryStatisticsRecorder recorder = 
CarbonTimeStatisticsFactory.createDriverRecorder();
+    QueryStatistic statistic = new QueryStatistic();
+
+    // get tokens for all the required FileSystem for table path
+    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+        new Path[] { new Path(carbonTable.getTablePath()) }, 
job.getConfiguration());
+    List<ExtendedBlocklet> prunedFiles = getPrunedFiles4ExternalFormat(job, 
carbonTable,
+        resolver, segmentIds);
+    List<CarbonInputSplit> resultFilteredFiles = new ArrayList<>();
+
+    for (ExtendedBlocklet blocklet : prunedFiles) {
+      List<CarbonInputSplit> inputSplits = 
convertToInputSplit4ExternalFormat(job, blocklet);
+      resultFilteredFiles.addAll(inputSplits);
+    }
+
+    statistic
+        .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, 
System.currentTimeMillis());
+    recorder.recordStatisticsForDriver(statistic, 
job.getConfiguration().get("query.id"));
+    return resultFilteredFiles;
+  }
+
   /**
    * get data blocks of given segment
    */
@@ -475,6 +507,32 @@ m filterExpression
     return prunedBlocklets;
   }
 
+  private List<ExtendedBlocklet> getPrunedFiles4ExternalFormat(JobContext job,
+      CarbonTable carbonTable,
+      FilterResolverIntf resolver, List<Segment> segmentIds) throws 
IOException {
+    ExplainCollector.addPruningInfo(carbonTable.getTableName());
+    if (resolver != null) {
+      
ExplainCollector.setFilterStatement(resolver.getFilterExpression().getStatement());
+    } else {
+      ExplainCollector.setFilterStatement("none");
+    }
+
+    // there is no default datamap for external format, so return all files
+    List<ExtendedBlocklet> prunedFiles = new ArrayList<>();
+    LoadMetadataDetails[] loadMetadatas = 
SegmentStatusManager.readTableStatusFile(
+        CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()));
+    for (LoadMetadataDetails loadMetadata : loadMetadatas) {
+      for (String file : loadMetadata.getFactFilePath().split(",")) {
+        ExtendedBlocklet extendedBlocklet = new ExtendedBlocklet(file, "0");
+        extendedBlocklet.setSegmentId(loadMetadata.getLoadName());
+        prunedFiles.add(extendedBlocklet);
+      }
+    }
+
+    // todo: skip datamap prune now, will add it back later
+    return prunedFiles;
+  }
+
   /**
    * Prune the segments from the already pruned blocklets.
    * @param segments
@@ -515,12 +573,72 @@ m filterExpression
     return split;
   }
 
+  private List<CarbonInputSplit> convertToInputSplit4ExternalFormat(JobContext 
jobContext,
+      ExtendedBlocklet extendedBlocklet) throws IOException {
+    List<CarbonInputSplit> splits = new ArrayList<CarbonInputSplit>();
+    String factFilePath = extendedBlocklet.getFilePath();
+    Path path = new Path(factFilePath);
+    FileSystem fs = FileFactory.getFileSystem(path);
+    FileStatus fileStatus = fs.getFileStatus(path);
+    long length = fileStatus.getLen();
+    if (length != 0) {
+      BlockLocation[] blkLocations = fs.getFileBlockLocations(path, 0, length);
+      long blkSize = fileStatus.getBlockSize();
+      long minSplitSize = Math.max(getFormatMinSplitSize(), 
getMinSplitSize(jobContext));
+      long maxSplitSize = getMaxSplitSize(jobContext);
+      long splitSize = computeSplitSize(blkSize, minSplitSize, maxSplitSize);
+      long bytesRemaining = fileStatus.getLen();
+      while (((double) bytesRemaining) / splitSize > 1.1) {
+        int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+        splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path,
+            length - bytesRemaining,
+            splitSize, blkLocations[blkIndex].getHosts(),
+            blkLocations[blkIndex].getCachedHosts(), FileFormat.EXTERNAL));
+        bytesRemaining -= splitSize;
+      }
+      if (bytesRemaining != 0) {
+        int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+        splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path,
+            length - bytesRemaining,
+            bytesRemaining, blkLocations[blkIndex].getHosts(),
+            blkLocations[blkIndex].getCachedHosts(), FileFormat.EXTERNAL));
+      }
+    } else {
+      splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path, 
0, length,
+          new String[0], FileFormat.EXTERNAL));
+    }
+    return splits;
+  }
+
   @Override public RecordReader<Void, T> createRecordReader(InputSplit 
inputSplit,
       TaskAttemptContext taskAttemptContext) throws IOException, 
InterruptedException {
     Configuration configuration = taskAttemptContext.getConfiguration();
     QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
     CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
-    return new CarbonRecordReader<T>(queryModel, readSupport);
+    if (inputSplit instanceof CarbonMultiBlockSplit
+        && ((CarbonMultiBlockSplit) inputSplit).getFileFormat() == 
FileFormat.EXTERNAL) {
+      return createRecordReaderForExternalFormat(queryModel, readSupport,
+          
configuration.get(CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY));
+    } else if (inputSplit instanceof CarbonInputSplit
+        && ((CarbonInputSplit) inputSplit).getFileFormat() == 
FileFormat.EXTERNAL) {
+      return createRecordReaderForExternalFormat(queryModel, readSupport,
+          
configuration.get(CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY));
+    } else {
+      return new CarbonRecordReader<T>(queryModel, readSupport);
+    }
+  }
+
+  private RecordReader<Void, T> createRecordReaderForExternalFormat(QueryModel 
queryModel,
+      CarbonReadSupport readSupport, String format) {
+    try {
+      if ("csv".equals(format)) {
+        return new CsvRecordReader<T>(queryModel, readSupport);
+      } else {
+        throw new RuntimeException("Unsupported external file format " + 
format);
+      }
+    } catch (Throwable e) {
+      throw new RuntimeException("Failed to create recordReader for format " + 
format, e);
+    }
   }
 
   public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index b549b16..8d10ec5 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -524,9 +524,15 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
     isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
 
     // for each segment fetch blocks matching filter in Driver BTree
-    List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
-        getDataBlocksOfSegment(job, carbonTable, filterResolver, 
matchedPartitions,
-            validSegments, partitionInfo, oldPartitionIdList);
+    List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment;
+    if (carbonTable.getTableInfo().getFormat().equals("")
+        || carbonTable.getTableInfo().getFormat().equals("carbondata")) {
+      dataBlocksOfSegment = getDataBlocksOfSegment(job, carbonTable, 
filterResolver,
+          matchedPartitions, validSegments, partitionInfo, oldPartitionIdList);
+    } else {
+      dataBlocksOfSegment = getDataBlocksOfSegment4ExternalFormat(job, 
carbonTable, filterResolver,
+          validSegments);
+    }
     numBlocks = dataBlocksOfSegment.size();
     for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : 
dataBlocksOfSegment) {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/integration/spark-common-test/src/test/resources/datawithoutheader_delimiter_separator.csv
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/resources/datawithoutheader_delimiter_separator.csv
 
b/integration/spark-common-test/src/test/resources/datawithoutheader_delimiter_separator.csv
new file mode 100644
index 0000000..62efa68
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/resources/datawithoutheader_delimiter_separator.csv
@@ -0,0 +1,10 @@
+11|arvind|SE|17-01-2007|1|developer|10|network|928478|17-02-2007|29-11-2016|96|96|5040
+12|krithin|SSE|29-05-2008|1|developer|11|protocol|928378|29-06-2008|30-12-2016|85|95|7124
+13|madhan|TPL|07-07-2009|2|tester|10|network|928478|07-08-2009|30-12-2016|88|99|9054
+14|anandh|SA|29-12-2010|3|manager|11|protocol|928278|29-01-2011|29-06-2016|77|92|11248
+15|ayushi|SSA|09-11-2011|1|developer|12|security|928375|09-12-2011|29-05-2016|99|91|13245
+16|pramod|SE|14-10-2012|1|developer|13|configManagement|928478|14-11-2012|29-12-2016|86|93|5040
+17|gawrav|PL|22-09-2013|2|tester|12|security|928778|22-10-2013|15-11-2016|78|97|9574
+18|sibi|TL|15-08-2014|2|tester|14|Learning|928176|15-09-2014|29-05-2016|84|98|7245
+19|shivani|PL|12-05-2015|1|developer|10|network|928977|12-06-2015|12-11-2016|88|91|11254
+20|bill|PM|01-12-2015|3|manager|14|Learning|928479|01-01-2016|30-11-2016|75|94|13547

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/externalformat/CsvBasedCarbonTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/externalformat/CsvBasedCarbonTableSuite.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/externalformat/CsvBasedCarbonTableSuite.scala
new file mode 100644
index 0000000..7f07878
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/externalformat/CsvBasedCarbonTableSuite.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.externalformat
+
+
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.test.Spark2TestQueryExecutor
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class CsvBasedCarbonTableSuite extends QueryTest
+  with BeforeAndAfterEach with BeforeAndAfterAll {
+
+  val carbonTable = "fact_carbon_table"
+  val csvCarbonTable = "fact_carbon_csv_table"
+  val csvFile = s"$resourcesPath/datawithoutheader.csv"
+  val csvFile_delimiter_separator = 
s"$resourcesPath/datawithoutheader_delimiter_separator.csv"
+
+  // prepare normal carbon table for comparison
+  override protected def beforeAll(): Unit = {
+    sql(s"DROP TABLE IF EXISTS $carbonTable")
+    sql(
+      s"""
+         | CREATE TABLE $carbonTable(empno smallint, empname String, 
designation string,
+         | doj String, workgroupcategory int, workgroupcategoryname 
String,deptno int,
+         | deptname String, projectcode int, projectjoindate 
String,projectenddate String,
+         | attendance String, utilization String,salary String)
+         | STORED BY 'carbondata'
+       """.stripMargin
+    )
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$csvFile' INTO TABLE $carbonTable
+         | OPTIONS('DELIMITER'=',',
+         | 'QUOTECHAR'='\"',
+         | 'FILEHEADER'='EMPno, empname,designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, SALARY')
+        """.stripMargin)
+  }
+
+  override protected def afterAll(): Unit = {
+    sql(s"DROP TABLE IF EXISTS $carbonTable")
+  }
+
+  override protected def beforeEach(): Unit = {
+    sql(s"DROP TABLE IF EXISTS $csvCarbonTable")
+  }
+
+  override protected def afterEach(): Unit = {
+    sql(s"DROP TABLE IF EXISTS $csvCarbonTable")
+  }
+
+  private def checkQuery() {
+    // query all the columns
+    checkAnswer(sql(s"SELECT eMPno, empname,designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, SALARY FROM 
$csvCarbonTable WHERE empno = 15"),
+      sql(s"SELECT eMPno, empname,designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, SALARY FROM $carbonTable WHERE empno = 
15"))
+    // query part of the columns
+    checkAnswer(sql(s"SELECT empno,empname, deptname, doj FROM $csvCarbonTable 
WHERE empno = 15"),
+      sql(s"SELECT empno,empname, deptname, doj FROM $carbonTable WHERE empno 
= 15"))
+    // sequence of projection column are not same with that in DDL
+    checkAnswer(sql(s"SELECT empname, empno, deptname, doj FROM 
$csvCarbonTable WHERE empno = 15"),
+      sql(s"SELECT empname, empno, deptname, doj FROM $carbonTable WHERE empno 
= 15"))
+    // query with greater
+    checkAnswer(sql(s"SELECT empname, empno, deptname, doj FROM 
$csvCarbonTable WHERE empno > 15"),
+      sql(s"SELECT empname, empno, deptname, doj FROM $carbonTable WHERE empno 
> 15"))
+    // query with filter on dimension
+    checkAnswer(sql(s"SELECT empname, empno, deptname, doj FROM 
$csvCarbonTable WHERE empname = 'ayushi'"),
+      sql(s"SELECT empname, empno, deptname, doj FROM $carbonTable WHERE 
empname = 'ayushi'"))
+    // aggreate query
+    checkAnswer(sql(s"SELECT designation, sum(empno), avg(empno) FROM 
$csvCarbonTable GROUP BY designation"),
+      sql(s"SELECT designation, sum(empno), avg(empno) FROM $carbonTable GROUP 
BY designation"))
+  }
+
+  test("test csv based carbon table") {
+    // create csv based carbon table
+    sql(
+      s"""
+         | CREATE TABLE $csvCarbonTable(empno smallint, empname String, 
designation string,
+         | doj String, workgroupcategory int, workgroupcategoryname 
String,deptno int,
+         | deptname String, projectcode int, projectjoindate 
String,projectenddate String,
+         | attendance String, utilization String,salary String)
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(
+         | 'format'='csv',
+         | 'csv.header'='eMPno, empname,designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, SALARY'
+         | )
+       """.stripMargin
+    )
+    // check that the external format info is stored in tableinfo
+    val tblInfo =
+      CarbonEnv.getCarbonTable(Option("default"), 
csvCarbonTable)(Spark2TestQueryExecutor.spark)
+    assertResult("csv")(tblInfo.getTableInfo.getFormat)
+    assertResult(1)(tblInfo.getTableInfo.getFormatProperties.size())
+    assertResult(
+      "eMPno, empname,designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, SALARY".toLowerCase)(
+      tblInfo.getTableInfo.getFormatProperties.get("csv.header"))
+
+    // add segment for csv based carbontable
+    sql(s"ALTER TABLE $csvCarbonTable ADD SEGMENT LOCATION '$csvFile'")
+
+    // check that the fact files has been stored in tablestatus
+    val metadataPath = CarbonTablePath.getMetadataPath(tblInfo.getTablePath)
+    val details = SegmentStatusManager.readLoadMetadata(metadataPath)
+    assertResult(1)(details.length)
+    assertResult(csvFile)(details(0).getFactFilePath)
+
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
 "true")
+    // check query on csv based carbontable
+    // query with vector reader on
+    checkQuery()
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
 "false")
+    // query with vector reader off
+    checkQuery()
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+      CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
+  }
+
+  test("test csv based carbon table: only support csv now") {
+    val expectedException = intercept[Exception] {
+      sql(
+        s"""
+           | CREATE TABLE $csvCarbonTable(empname String, empno smallint, 
designation string,
+           | deptname String, projectcode int, projectjoindate 
String,projectenddate String,
+           | doj String, workgroupcategory int, workgroupcategoryname 
String,deptno int,
+           | attendance String, utilization String,salary String)
+           | STORED BY 'carbondata'
+           | TBLPROPERTIES(
+           | 'format'='parquet',
+           | 'csv.header'='eMPno, empname,designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, SALARY'
+           | )
+       """.stripMargin
+      )
+    }
+
+    assert(expectedException.getMessage.contains("Currently we only support 
csv as external file format"))
+  }
+
+  test("test csv based carbon table: the sequence of header does not match 
schema") {
+    // create csv based carbon table, the sequence in schema is not the same 
in csv.header
+    sql(
+      s"""
+         | CREATE TABLE $csvCarbonTable(empname String, empno smallint, 
designation string,
+         | deptname String, projectcode int, projectjoindate 
String,projectenddate String,
+         | doj String, workgroupcategory int, workgroupcategoryname 
String,deptno int,
+         | attendance String, utilization String,salary String)
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(
+         | 'format'='csv',
+         | 'csv.header'='eMPno, empname,designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, SALARY'
+         | )
+       """.stripMargin
+    )
+    // add segment for csv based carbontable
+    sql(s"ALTER TABLE $csvCarbonTable ADD SEGMENT LOCATION '$csvFile'")
+
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
 "true")
+    // check query on csv based carbontable
+    // query with vector reader on
+    checkQuery()
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
 "false")
+    // query with vector reader off
+    checkQuery()
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+      CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
+  }
+
+  test("test csv based carbon table: not specify the header") {
+    // create csv based carbon table, the sequence in schema is not the same 
in csv.header
+    sql(
+      s"""
+         | CREATE TABLE $csvCarbonTable(empno smallint, empname String, 
designation string,
+         | doj String, workgroupcategory int, workgroupcategoryname 
String,deptno int,
+         | deptname String, projectcode int, projectjoindate 
String,projectenddate String,
+         | attendance String, utilization String,salary String)
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(
+         | 'format'='csv'
+         | )
+       """.stripMargin
+    )
+
+    // add segment for csv based carbontable
+    sql(s"ALTER TABLE $csvCarbonTable ADD SEGMENT LOCATION '$csvFile'")
+
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
 "true")
+    // check query on csv based carbontable
+    // query with vector reader on
+    checkQuery()
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
 "false")
+    // query with vector reader off
+    checkQuery()
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+      CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
+  }
+
+  test("test csv based carbon table: user specified delimiter") {
+    // create csv based carbon table, the sequence in schema is not the same 
in csv.header
+    sql(
+      s"""
+         | CREATE TABLE $csvCarbonTable(empno smallint, empname String, 
designation string,
+         | doj String, workgroupcategory int, workgroupcategoryname 
String,deptno int,
+         | deptname String, projectcode int, projectjoindate 
String,projectenddate String,
+         | attendance String, utilization String,salary String)
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(
+         | 'format'='csv',
+         | 'csv.delimiter'='|'
+         | )
+       """.stripMargin
+    )
+
+    // add segment for csv based carbontable
+    sql(s"ALTER TABLE $csvCarbonTable ADD SEGMENT LOCATION 
'$csvFile_delimiter_separator'")
+
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
 "true")
+    // check query on csv based carbontable
+    // query with vector reader on
+    checkQuery()
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
 "false")
+    // query with vector reader off
+    checkQuery()
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+      CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/integration/spark-common/src/main/java/org/apache/carbondata/spark/format/CsvReadSupport.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/format/CsvReadSupport.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/format/CsvReadSupport.java
new file mode 100644
index 0000000..53d6d7f
--- /dev/null
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/format/CsvReadSupport.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.format;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl;
+
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.CalendarIntervalType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * read support for csv
+ */
+@InterfaceStability.Evolving
+@InterfaceAudience.Internal
+public class CsvReadSupport<T> implements CarbonReadSupport<T> {
+  private CarbonColumn[] carbonColumns;
+  private StructType outputSchema;
+  private Object[] finalOutputValues;
+  @Override
+  public void initialize(CarbonColumn[] carbonColumns, CarbonTable carbonTable)
+      throws IOException {
+    this.carbonColumns = carbonColumns;
+    this.finalOutputValues = new Object[carbonColumns.length];
+    outputSchema = new StructType(convertCarbonColumnSpark(carbonColumns));
+  }
+
+  private StructField[] convertCarbonColumnSpark(CarbonColumn[] columns) {
+    return (StructField[]) new 
SparkDataTypeConverterImpl().convertCarbonSchemaToSparkSchema(
+        columns);
+  }
+
+  @Override
+  public T readRow(Object[] data) {
+    for (int i = 0; i < carbonColumns.length; i++) {
+      Object originValue = data[i];
+      org.apache.spark.sql.types.DataType t = outputSchema.apply(i).dataType();
+      finalOutputValues[i] = convertToSparkValue(originValue, t);
+    }
+    return (T) new GenericInternalRow(finalOutputValues);
+  }
+  private Object convertToSparkValue(Object originValue, 
org.apache.spark.sql.types.DataType t) {
+    if (null == originValue) {
+      return null;
+    } else {
+      String value = String.valueOf(originValue);
+      if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
+        return Boolean.parseBoolean(value);
+      } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
+        return Byte.parseByte(value);
+      } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
+        return Short.parseShort(value);
+      } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
+        return Integer.parseInt(value);
+      } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
+        return Long.parseLong(value);
+      } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
+        return Float.parseFloat(value);
+      } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
+        return Double.parseDouble(value);
+      } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
+        return UTF8String.fromString(value);
+      } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
+        return Decimal.fromDecimal(value);
+      } else if (t instanceof CalendarIntervalType) {
+        return CalendarInterval.fromString(value);
+      } else if (t instanceof org.apache.spark.sql.types.DateType) {
+        return Integer.parseInt(value);
+      } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
+        return Long.parseLong(value);
+      } else {
+        return null;
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/integration/spark-common/src/main/java/org/apache/carbondata/spark/format/VectorCsvReadSupport.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/format/VectorCsvReadSupport.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/format/VectorCsvReadSupport.java
new file mode 100644
index 0000000..81bd25d
--- /dev/null
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/format/VectorCsvReadSupport.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.format;
+
+import java.io.IOException;
+import java.math.BigInteger;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.types.CalendarIntervalType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * read support for csv vector reader
+ */
+@InterfaceStability.Evolving
+@InterfaceAudience.Internal
+public class VectorCsvReadSupport<T> implements CarbonReadSupport<T> {
+  private static final int MAX_BATCH_SIZE =
+      
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
+  private CarbonColumn[] carbonColumns;
+  private ColumnarBatch columnarBatch;
+  private StructType outputSchema;
+
+  @Override
+  public void initialize(CarbonColumn[] carbonColumns, CarbonTable carbonTable)
+      throws IOException {
+    this.carbonColumns = carbonColumns;
+    outputSchema = new StructType(convertCarbonColumnSpark(carbonColumns));
+  }
+
+  private StructField[] convertCarbonColumnSpark(CarbonColumn[] columns) {
+    return (StructField[]) new 
SparkDataTypeConverterImpl().convertCarbonSchemaToSparkSchema(
+        columns);
+  }
+
+  @Override
+  public T readRow(Object[] data) {
+    columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, 
MAX_BATCH_SIZE);
+    int rowId = 0;
+    for (; rowId < data.length; rowId++) {
+      for (int colIdx = 0; colIdx < carbonColumns.length; colIdx++) {
+        Object originValue = ((Object[]) data[rowId])[colIdx];
+        ColumnVector col = columnarBatch.column(colIdx);
+        org.apache.spark.sql.types.DataType t = col.dataType();
+        if (null == originValue) {
+          col.putNull(rowId);
+        } else {
+          String value = String.valueOf(originValue);
+          if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
+            col.putBoolean(rowId, Boolean.parseBoolean(value));
+          } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
+            col.putByte(rowId, Byte.parseByte(value));
+          } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
+            col.putShort(rowId, Short.parseShort(value));
+          } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
+            col.putInt(rowId, Integer.parseInt(value));
+          } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
+            col.putLong(rowId, Long.parseLong(value));
+          } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
+            col.putFloat(rowId, Float.parseFloat(value));
+          } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
+            col.putDouble(rowId, Double.parseDouble(value));
+          } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
+            UTF8String v = UTF8String.fromString(value);
+            col.putByteArray(rowId, v.getBytes());
+          } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
+            DecimalType dt = (DecimalType)t;
+            Decimal d = Decimal.fromDecimal(value);
+            if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+              col.putInt(rowId, (int)d.toUnscaledLong());
+            } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+              col.putLong(rowId, d.toUnscaledLong());
+            } else {
+              final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+              byte[] bytes = integer.toByteArray();
+              col.putByteArray(rowId, bytes, 0, bytes.length);
+            }
+          } else if (t instanceof CalendarIntervalType) {
+            CalendarInterval c = CalendarInterval.fromString(value);
+            col.getChildColumn(0).putInt(rowId, c.months);
+            col.getChildColumn(1).putLong(rowId, c.microseconds);
+          } else if (t instanceof org.apache.spark.sql.types.DateType) {
+            col.putInt(rowId, Integer.parseInt(value));
+          } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
+            col.putLong(rowId, Long.parseLong(value));
+          }
+        }
+      }
+    }
+    columnarBatch.setNumRows(rowId);
+    return (T) columnarBatch;
+  }
+
+  @Override
+  public void close() {
+    if (columnarBatch != null) {
+      columnarBatch.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 6483dc2..afd3af2 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -58,6 +58,7 @@ import 
org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.InitInputMetrics
+import org.apache.carbondata.spark.format.{CsvReadSupport, 
VectorCsvReadSupport}
 import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
 import org.apache.carbondata.streaming.CarbonStreamInputFormat
 
@@ -87,6 +88,7 @@ class CarbonScanRDD[T: ClassTag](
   private var vectorReader = false
 
   private val bucketedTable = tableInfo.getFactTable.getBucketingInfo
+  private val storageFormat = tableInfo.getFormat
 
   @transient val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getName)
 
@@ -133,10 +135,13 @@ class CarbonScanRDD[T: ClassTag](
       // 2. for stream splits, create partition for each split by default
       val columnarSplits = new ArrayList[InputSplit]()
       val streamSplits = new ArrayBuffer[InputSplit]()
+      val externalSplits = new ArrayBuffer[InputSplit]()
       splits.asScala.foreach { split =>
         val carbonInputSplit = split.asInstanceOf[CarbonInputSplit]
         if (FileFormat.ROW_V1 == carbonInputSplit.getFileFormat) {
           streamSplits += split
+        } else if (FileFormat.EXTERNAL == carbonInputSplit.getFileFormat) {
+          externalSplits += split
         } else {
           columnarSplits.add(split)
         }
@@ -146,31 +151,39 @@ class CarbonScanRDD[T: ClassTag](
       distributeEndTime = System.currentTimeMillis()
       // check and remove InExpression from filterExpression
       checkAndRemoveInExpressinFromFilterExpression(batchPartitions)
-      if (streamSplits.isEmpty) {
-        partitions = batchPartitions.toArray
-      } else {
-        val index = batchPartitions.length
-        val streamPartitions: mutable.Buffer[Partition] =
-          streamSplits.zipWithIndex.map { splitWithIndex =>
-            val multiBlockSplit =
-              new CarbonMultiBlockSplit(
-                Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
-                splitWithIndex._1.getLocations,
-                FileFormat.ROW_V1)
-            new CarbonSparkPartition(id, splitWithIndex._2 + index, 
multiBlockSplit)
-          }
-        if (batchPartitions.isEmpty) {
-          partitions = streamPartitions.toArray
-        } else {
-          logInfo(
-            s"""
-               | Identified no.of Streaming Blocks: ${ streamPartitions.size },
-          """.stripMargin)
-          // should keep the order by index of partition
-          batchPartitions.appendAll(streamPartitions)
-          partitions = batchPartitions.toArray
+
+      def generateNonBatchPartitions(index: Int, splits : 
ArrayBuffer[InputSplit],
+          format: FileFormat): mutable.Buffer[Partition] = {
+        splits.zipWithIndex.map { splitWithIndex =>
+          val multiBlockSplit =
+            new CarbonMultiBlockSplit(
+              Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
+              splitWithIndex._1.getLocations,
+              format)
+          new CarbonSparkPartition(id, splitWithIndex._2 + index, 
multiBlockSplit)
         }
       }
+
+      val allPartitions: mutable.Buffer[Partition] = mutable.Buffer()
+      val index = batchPartitions.length
+      val streamPartitions: mutable.Buffer[Partition] = 
generateNonBatchPartitions(
+        index, streamSplits, FileFormat.ROW_V1)
+      val externalPartitions: mutable.Buffer[Partition] = 
generateNonBatchPartitions(
+        index + streamPartitions.length, externalSplits, FileFormat.EXTERNAL)
+
+      if (batchPartitions.nonEmpty) {
+        LOGGER.info(s"Identified no.of batch blocks: ${batchPartitions.size}")
+        allPartitions.appendAll(batchPartitions)
+      }
+      if (streamPartitions.nonEmpty) {
+        LOGGER.info(s"Identified no.of stream blocks: 
${streamPartitions.size}")
+        allPartitions.appendAll(streamPartitions)
+      }
+      if (externalPartitions.nonEmpty) {
+        LOGGER.info(s"Identified no.of external blocks: 
${externalPartitions.size}")
+        allPartitions.appendAll(externalPartitions)
+      }
+      partitions = allPartitions.toArray
       partitions
     } finally {
       Profiler.invokeIfEnable {
@@ -358,7 +371,7 @@ class CarbonScanRDD[T: ClassTag](
     }
     logInfo(
       s"""
-         | Identified no.of.blocks: $noOfBlocks,
+         | Identified no.of.blocks(columnar): $noOfBlocks,
          | no.of.tasks: $noOfTasks,
          | no.of.nodes: $noOfNodes,
          | parallelism: $parallelism
@@ -426,6 +439,22 @@ class CarbonScanRDD[T: ClassTag](
           val streamReader = inputFormat.createRecordReader(inputSplit, 
attemptContext)
             .asInstanceOf[RecordReader[Void, Object]]
           streamReader
+        case FileFormat.EXTERNAL =>
+          require(storageFormat.equals("csv"),
+            "Currently we only support csv as external file format")
+          attemptContext.getConfiguration.set(
+            CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY, 
storageFormat)
+          val externalRecordReader = format.createRecordReader(inputSplit, 
attemptContext)
+            .asInstanceOf[CsvRecordReader[Object]]
+          externalRecordReader.setVectorReader(vectorReader)
+          externalRecordReader.setInputMetricsStats(inputMetricsStats)
+          externalRecordReader.setQueryModel(model)
+          if (vectorReader) {
+            externalRecordReader.setReadSupport(new 
VectorCsvReadSupport[Object]())
+          } else {
+            externalRecordReader.setReadSupport(new CsvReadSupport[Object]())
+          }
+          externalRecordReader
         case _ =>
           // create record reader for CarbonData file format
           if (vectorReader) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index e534f5f..c7dd546 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -187,6 +187,7 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
   protected val STREAM = carbonKeyWord("STREAM")
   protected val STREAMS = carbonKeyWord("STREAMS")
   protected val STMPROPERTIES = carbonKeyWord("STMPROPERTIES")
+  protected val LOCATION = carbonKeyWord("LOCATION")
 
   protected val doubleQuotedString = "\"([^\"]+)\"".r
   protected val singleQuotedString = "'([^']+)'".r

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e4bfb570/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index c77d0df..a641329 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -736,6 +736,16 @@ class TableNewProcessor(cm: TableModel) {
       cm.tableName))
     tableInfo.setLastUpdatedTime(System.currentTimeMillis())
     tableInfo.setFactTable(tableSchema)
+    val format = cm.tableProperties.get(CarbonCommonConstants.FORMAT)
+    if (format.isDefined) {
+      if (!format.get.equalsIgnoreCase("csv")) {
+        CarbonException.analysisException(s"Currently we only support csv as 
external file format")
+      }
+      tableInfo.setFormat(format.get)
+      val formatProperties = cm.tableProperties.filter(pair =>
+        pair._1.startsWith(s"${format.get.toLowerCase}.")).asJava
+      tableInfo.setFormatProperties(formatProperties)
+    }
     tableInfo
   }
 

Reply via email to