Repository: carbondata
Updated Branches:
  refs/heads/master 982d03fea -> df5d7a99e


[CARBONDATA-1998][SDK] Support CarbonReader to read carbondata files

Support CarbonReader to read carbondata files

This closes #2072


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/df5d7a99
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/df5d7a99
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/df5d7a99

Branch: refs/heads/master
Commit: df5d7a99eabc951eed0d3cf4aec8985d83b74160
Parents: c09ef99
Author: Jacky Li <[email protected]>
Authored: Sat Mar 17 17:18:24 2018 +0800
Committer: ravipesala <[email protected]>
Committed: Fri Mar 23 21:20:24 2018 +0530

----------------------------------------------------------------------
 .../core/indexstore/BlockletDetailInfo.java     |  5 +-
 .../indexstore/blockletindex/IndexWrapper.java  |  3 +-
 .../core/metadata/schema/SchemaReader.java      | 97 ++++++++++++++++++++
 .../core/metadata/schema/table/CarbonTable.java | 82 ++++++++++++++++-
 .../schema/table/TableSchemaBuilder.java        |  9 ++
 .../executor/impl/AbstractQueryExecutor.java    | 14 ++-
 .../core/scan/model/QueryModelBuilder.java      | 73 +++++++++++++++
 .../carbondata/hadoop/CarbonProjection.java     | 11 +++
 .../hadoop/api/CarbonFileInputFormat.java       |  8 +-
 .../hadoop/api/CarbonInputFormat.java           |  7 +-
 .../hadoop/api/CarbonTableInputFormat.java      | 13 +--
 .../hadoop/util/CarbonInputFormatUtil.java      | 40 --------
 .../carbondata/hadoop/util/SchemaReader.java    | 97 --------------------
 .../hive/MapredCarbonInputFormat.java           |  8 +-
 .../management/RefreshCarbonTableCommand.scala  |  4 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  2 +-
 .../carbondata/sdk/file/CarbonReader.java       | 64 +++++++++++++
 .../sdk/file/CarbonReaderBuilder.java           | 95 +++++++++++++++++++
 .../sdk/file/CSVCarbonWriterTest.java           | 34 +++++++
 19 files changed, 495 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
index 2865d4b..660f4c1 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -209,7 +209,10 @@ public class BlockletDetailInfo implements Serializable, 
Writable {
     this.blockFooterOffset = blockFooterOffset;
   }
 
-  public List<ColumnSchema> getColumnSchemas() {
+  public List<ColumnSchema> getColumnSchemas() throws IOException {
+    if (columnSchemas == null && columnSchemaBinary != null) {
+      readColumnSchema(columnSchemaBinary);
+    }
     return columnSchemas;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
index 95232e5..1de3122 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.core.indexstore.blockletindex;
 
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
@@ -33,7 +34,7 @@ public class IndexWrapper extends AbstractIndex {
 
   private List<TableBlockInfo> blockInfos;
 
-  public IndexWrapper(List<TableBlockInfo> blockInfos) {
+  public IndexWrapper(List<TableBlockInfo> blockInfos) throws IOException {
     this.blockInfos = blockInfos;
     segmentProperties = new 
SegmentProperties(blockInfos.get(0).getDetailInfo().getColumnSchemas(),
         blockInfos.get(0).getDetailInfo().getDimLens());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
new file mode 100644
index 0000000..787a9b9
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata.schema;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * TODO: It should be removed after store manager implementation.
+ */
+public class SchemaReader {
+
+  public static CarbonTable readCarbonTableFromStore(AbsoluteTableIdentifier 
identifier)
+      throws IOException {
+    String schemaFilePath = 
CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
+    if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
+        FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
+        FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.S3) ||
+        FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)) {
+      String tableName = identifier.getCarbonTableIdentifier().getTableName();
+
+      org.apache.carbondata.format.TableInfo tableInfo =
+          
CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(identifier.getTablePath()));
+      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+      TableInfo wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(
+          tableInfo,
+          identifier.getCarbonTableIdentifier().getDatabaseName(),
+          tableName,
+          identifier.getTablePath());
+      CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+      return CarbonMetadata.getInstance().getCarbonTable(
+          identifier.getCarbonTableIdentifier().getTableUniqueName());
+    } else {
+      throw new IOException("File does not exist: " + schemaFilePath);
+    }
+  }
+
+  /**
+   * the method returns the Wrapper TableInfo
+   *
+   * @param identifier
+   * @return
+   */
+  public static TableInfo getTableInfo(AbsoluteTableIdentifier identifier)
+      throws IOException {
+    org.apache.carbondata.format.TableInfo thriftTableInfo =
+        
CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(identifier.getTablePath()));
+    ThriftWrapperSchemaConverterImpl thriftWrapperSchemaConverter =
+        new ThriftWrapperSchemaConverterImpl();
+    CarbonTableIdentifier carbonTableIdentifier =
+        identifier.getCarbonTableIdentifier();
+    return thriftWrapperSchemaConverter.fromExternalToWrapperTableInfo(
+        thriftTableInfo,
+        carbonTableIdentifier.getDatabaseName(),
+        carbonTableIdentifier.getTableName(),
+        identifier.getTablePath());
+  }
+
+
+  public static TableInfo inferSchema(AbsoluteTableIdentifier identifier)
+      throws IOException {
+    // This routine is going to infer schema from the carbondata file footer
+    // Convert the ColumnSchema -> TableSchema -> TableInfo.
+    // Return the TableInfo.
+    org.apache.carbondata.format.TableInfo tableInfo =
+        CarbonUtil.inferSchema(identifier.getTablePath(), identifier, false);
+    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+    TableInfo wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(
+        tableInfo, identifier.getDatabaseName(), identifier.getTableName(),
+        identifier.getTablePath());
+    return wrapperTableInfo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 9e0d80a..9d50048 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -30,21 +30,32 @@ import java.util.Map;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonHeaderReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
+import org.apache.carbondata.core.scan.filter.TableProvider;
+import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
+import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.model.QueryProjection;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeConverter;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.FileHeader;
 
 /**
  * Mapping class for Carbon actual table
@@ -192,6 +203,34 @@ public class CarbonTable implements Serializable {
     }
   }
 
+  public static CarbonTable buildFromDataFile(
+      String tableName, String tablePath, String filePath) throws IOException {
+    CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(filePath);
+    FileHeader fileHeader = carbonHeaderReader.readHeader();
+    TableSchemaBuilder builder = TableSchema.builder();
+    ThriftWrapperSchemaConverterImpl schemaConverter = new 
ThriftWrapperSchemaConverterImpl();
+    for (org.apache.carbondata.format.ColumnSchema column : 
fileHeader.getColumn_schema()) {
+      ColumnSchema columnSchema = 
schemaConverter.fromExternalToWrapperColumnSchema(column);
+      builder.addColumn(
+          new StructField(columnSchema.getColumnName(), 
columnSchema.getDataType()), false);
+    }
+
+    TableSchema tableSchema = builder.tableName(tableName).build();
+    TableInfo tableInfo = new TableInfo();
+    tableInfo.setFactTable(tableSchema);
+    tableInfo.setTablePath(tablePath);
+    tableInfo.setDatabaseName("default");
+    tableInfo.setTableUniqueName(
+        CarbonTable.buildUniqueName("default", tableSchema.getTableName()));
+    return buildFromTableInfo(tableInfo);
+  }
+
+  public static CarbonTable buildFromTablePath(
+      String tableName, String tablePath) throws IOException {
+    return SchemaReader.readCarbonTableFromStore(
+        AbsoluteTableIdentifier.from(tablePath, tableName, "default"));
+  }
+
   /**
    * @param tableInfo
    */
@@ -492,6 +531,20 @@ public class CarbonTable implements Serializable {
   }
 
   /**
+   * Return all dimensions of the table
+   */
+  public List<CarbonDimension> getDimensions() {
+    return tableDimensionsMap.get(getTableName());
+  }
+
+  /**
+   * Return all measure of the table
+   */
+  public List<CarbonMeasure> getMeasures() {
+    return tableMeasuresMap.get(getTableName());
+  }
+
+  /**
    * This will give user created order column
    *
    * @return
@@ -877,7 +930,7 @@ public class CarbonTable implements Serializable {
     return queryModel;
   }
 
-  private QueryProjection createProjection(String[] projectionColumnNames) {
+  public QueryProjection createProjection(String[] projectionColumnNames) {
     String factTableName = getTableName();
     QueryProjection projection = new QueryProjection();
     // fill dimensions
@@ -904,6 +957,33 @@ public class CarbonTable implements Serializable {
     return projection;
   }
 
+  public void processFilterExpression(Expression filterExpression,
+      boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
+    QueryModel.processFilterExpression(this, filterExpression, 
isFilterDimensions,
+        isFilterMeasures);
+
+    if (null != filterExpression) {
+      // Optimize Filter Expression and fit RANGE filters is conditions apply.
+      FilterOptimizer rangeFilterOptimizer =
+          new RangeFilterOptmizer(filterExpression);
+      rangeFilterOptimizer.optimizeFilter();
+    }
+  }
+
+  /**
+   * Resolve the filter expression.
+   */
+  public FilterResolverIntf resolveFilter(Expression filterExpression,
+      TableProvider tableProvider) {
+    try {
+      FilterExpressionProcessor filterExpressionProcessor = new 
FilterExpressionProcessor();
+      return filterExpressionProcessor.getFilterResolver(
+          filterExpression, getAbsoluteTableIdentifier(), tableProvider);
+    } catch (Exception e) {
+      throw new RuntimeException("Error while resolving filter expression", e);
+    }
+  }
+
   /**
    * Create a {@link CarbonTableBuilder} to create {@link CarbonTable}
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index 8fdcbb1..2dd5a9e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -47,6 +47,8 @@ public class TableSchemaBuilder {
 
   private int blockSize;
 
+  private String tableName;
+
   public TableSchemaBuilder blockSize(int blockSize) {
     if (blockSize <= 0) {
       throw new IllegalArgumentException("blockSize should be greater than 0");
@@ -55,8 +57,15 @@ public class TableSchemaBuilder {
     return this;
   }
 
+  public TableSchemaBuilder tableName(String tableName) {
+    Objects.requireNonNull(tableName);
+    this.tableName = tableName;
+    return this;
+  }
+
   public TableSchema build() {
     TableSchema schema = new TableSchema();
+    schema.setTableName(tableName);
     schema.setTableId(UUID.randomUUID().toString());
     schema.setPartitionInfo(null);
     schema.setBucketingInfo(null);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 22d1df1..8f77996 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -265,11 +265,15 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
       AbstractIndex abstractIndex = queryProperties.dataBlocks.get(i);
       BlockletDataRefNode dataRefNode =
           (BlockletDataRefNode) abstractIndex.getDataRefNode();
-      blockExecutionInfoList.add(getBlockExecutionInfoForBlock(queryModel, 
abstractIndex,
-          
dataRefNode.getBlockInfos().get(0).getBlockletInfos().getStartBlockletNumber(),
-          dataRefNode.numberOfNodes(), 
dataRefNode.getBlockInfos().get(0).getFilePath(),
-          dataRefNode.getBlockInfos().get(0).getDeletedDeltaFilePath(),
-          dataRefNode.getBlockInfos().get(0).getSegmentId()));
+      blockExecutionInfoList.add(
+          getBlockExecutionInfoForBlock(
+              queryModel,
+              abstractIndex,
+              
dataRefNode.getBlockInfos().get(0).getBlockletInfos().getStartBlockletNumber(),
+              dataRefNode.numberOfNodes(),
+              dataRefNode.getBlockInfos().get(0).getFilePath(),
+              dataRefNode.getBlockInfos().get(0).getDeletedDeltaFilePath(),
+              dataRefNode.getBlockInfos().get(0).getSegmentId()));
     }
     if (null != queryModel.getStatisticsRecorder()) {
       QueryStatistic queryStatistic = new QueryStatistic();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
new file mode 100644
index 0000000..f40bd8b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.model;
+
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+public class QueryModelBuilder {
+
+  private CarbonTable carbonTable;
+
+  public QueryModelBuilder(CarbonTable carbonTable) {
+    this.carbonTable = carbonTable;
+  }
+
+  public QueryModel build(String[] projectionColumnNames, Expression 
filterExpression) {
+    QueryModel queryModel = QueryModel.newInstance(carbonTable);
+    QueryProjection projection = 
carbonTable.createProjection(projectionColumnNames);
+    queryModel.setProjection(projection);
+    boolean[] isFilterDimensions = new 
boolean[carbonTable.getDimensionOrdinalMax()];
+    boolean[] isFilterMeasures = new 
boolean[carbonTable.getAllMeasures().size()];
+    carbonTable.processFilterExpression(filterExpression, isFilterDimensions, 
isFilterMeasures);
+    queryModel.setIsFilterDimensions(isFilterDimensions);
+    queryModel.setIsFilterMeasures(isFilterMeasures);
+    FilterResolverIntf filterIntf = 
carbonTable.resolveFilter(filterExpression, null);
+    queryModel.setFilterExpressionResolverTree(filterIntf);
+    return queryModel;
+  }
+
+  public QueryModel build(Expression filterExpression) {
+    QueryProjection projection = new QueryProjection();
+
+    List<CarbonDimension> dimensions = carbonTable.getDimensions();
+    for (int i = 0; i < dimensions.size(); i++) {
+      projection.addDimension(dimensions.get(i), i);
+    }
+    List<CarbonMeasure> measures = carbonTable.getMeasures();
+    for (int i = 0; i < measures.size(); i++) {
+      projection.addMeasure(measures.get(i), i);
+    }
+
+    QueryModel queryModel = QueryModel.newInstance(carbonTable);
+    queryModel.setProjection(projection);
+    boolean[] isFilterDimensions = new 
boolean[carbonTable.getDimensionOrdinalMax()];
+    boolean[] isFilterMeasures = new 
boolean[carbonTable.getAllMeasures().size()];
+    carbonTable.processFilterExpression(filterExpression, isFilterDimensions, 
isFilterMeasures);
+    queryModel.setIsFilterDimensions(isFilterDimensions);
+    queryModel.setIsFilterMeasures(isFilterMeasures);
+    FilterResolverIntf filterIntf = 
carbonTable.resolveFilter(filterExpression, null);
+    queryModel.setFilterExpressionResolverTree(filterIntf);
+    return queryModel;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java
index e236615..b5d0b51 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.hadoop;
 
 import java.io.Serializable;
 import java.util.LinkedHashSet;
+import java.util.Objects;
 import java.util.Set;
 
 /**
@@ -29,6 +30,16 @@ public class CarbonProjection implements Serializable {
 
   private Set<String> columns = new LinkedHashSet<>();
 
+  public CarbonProjection() {
+  }
+
+  public CarbonProjection(String[] columnNames) {
+    Objects.requireNonNull(columnNames);
+    for (String columnName : columnNames) {
+      columns.add(columnName);
+    }
+  }
+
   public void addColumn(String column) {
     columns.add(column);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index ff532b7..c0366d2 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.mutate.UpdateVO;
@@ -43,8 +44,6 @@ import 
org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.carbondata.hadoop.util.SchemaReader;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -119,10 +118,9 @@ public class CarbonFileInputFormat<T> extends 
CarbonInputFormat<T> implements Se
       TableProvider tableProvider = new SingleTableProvider(carbonTable);
       // this will be null in case of corrupt schema file.
       PartitionInfo partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getTableName());
-      CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, 
null);
+      carbonTable.processFilterExpression(filter, null, null);
 
-      FilterResolverIntf filterInterface = CarbonInputFormatUtil
-          .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), 
tableProvider);
+      FilterResolverIntf filterInterface = carbonTable.resolveFilter(filter, 
tableProvider);
 
       String segmentDir = 
CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null");
       FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 3cc9c5f..5506af6 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -61,7 +61,6 @@ import org.apache.carbondata.hadoop.CarbonProjection;
 import org.apache.carbondata.hadoop.CarbonRecordReader;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 import 
org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
 
 import org.apache.commons.logging.Log;
@@ -411,12 +410,10 @@ public abstract class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
     boolean[] isFilterDimensions = new 
boolean[carbonTable.getDimensionOrdinalMax()];
     // getAllMeasures returns list of visible and invisible columns
     boolean[] isFilterMeasures = new 
boolean[carbonTable.getAllMeasures().size()];
-    CarbonInputFormatUtil
-        .processFilterExpression(filter, carbonTable, isFilterDimensions, 
isFilterMeasures);
+    carbonTable.processFilterExpression(filter, isFilterDimensions, 
isFilterMeasures);
     queryModel.setIsFilterDimensions(isFilterDimensions);
     queryModel.setIsFilterMeasures(isFilterMeasures);
-    FilterResolverIntf filterIntf = CarbonInputFormatUtil
-        .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), 
tableProvider);
+    FilterResolverIntf filterIntf = carbonTable.resolveFilter(filter, 
tableProvider);
     queryModel.setFilterExpressionResolverTree(filterIntf);
 
     // update the file level index store if there are invalid segment

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 605ea48..290b3d7 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
@@ -58,8 +59,6 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.carbondata.hadoop.util.SchemaReader;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -206,7 +205,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
     TableProvider tableProvider = new SingleTableProvider(carbonTable);
     // this will be null in case of corrupt schema file.
     PartitionInfo partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getTableName());
-    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, 
null);
+    carbonTable.processFilterExpression(filter, null, null);
 
     // prune partitions for filter query on partition table
     BitSet matchedPartitions = null;
@@ -221,8 +220,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
       }
     }
 
-    FilterResolverIntf filterInterface = CarbonInputFormatUtil
-        .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), 
tableProvider);
+    FilterResolverIntf filterInterface = carbonTable.resolveFilter(filter, 
tableProvider);
 
     // do block filtering and get split
     List<InputSplit> splits =
@@ -385,7 +383,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
         throw new IOException("Missing/Corrupt schema file for table.");
       }
 
-      CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, 
null);
+      carbonTable.processFilterExpression(filter, null, null);
 
       TableProvider tableProvider = new SingleTableProvider(carbonTable);
       // prune partitions for filter query on partition table
@@ -404,8 +402,7 @@ public class CarbonTableInputFormat<T> extends 
CarbonInputFormat<T> {
         }
       }
 
-      FilterResolverIntf filterInterface =
-          CarbonInputFormatUtil.resolveFilter(filter, identifier, 
tableProvider);
+      FilterResolverIntf filterInterface = carbonTable.resolveFilter(filter, 
tableProvider);
       // do block filtering and get split
       List<InputSplit> splits = getSplits(job, filterInterface, segmentList, 
matchedPartitions,
           partitionInfo, oldPartitionIdList, new 
SegmentUpdateStatusManager(identifier));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 9f8c5ec..d6d6603 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -25,14 +25,6 @@ import java.util.Locale;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
-import org.apache.carbondata.core.scan.filter.TableProvider;
-import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
-import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 
 import org.apache.hadoop.conf.Configuration;
@@ -71,38 +63,6 @@ public class CarbonInputFormatUtil {
     return carbonTableInputFormat;
   }
 
-  public static void processFilterExpression(Expression filterExpression, 
CarbonTable carbonTable,
-      boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
-    QueryModel.processFilterExpression(carbonTable, filterExpression, 
isFilterDimensions,
-        isFilterMeasures);
-
-    if (null != filterExpression) {
-      // Optimize Filter Expression and fit RANGE filters is conditions apply.
-      FilterOptimizer rangeFilterOptimizer =
-          new RangeFilterOptmizer(filterExpression);
-      rangeFilterOptimizer.optimizeFilter();
-    }
-  }
-
-  /**
-   * Resolve the filter expression.
-   *
-   * @param filterExpression
-   * @param absoluteTableIdentifier
-   * @return
-   */
-  public static FilterResolverIntf resolveFilter(Expression filterExpression,
-      AbsoluteTableIdentifier absoluteTableIdentifier, TableProvider 
tableProvider) {
-    try {
-      FilterExpressionProcessor filterExpressionProcessor = new 
FilterExpressionProcessor();
-      //get resolved filter
-      return filterExpressionProcessor
-          .getFilterResolver(filterExpression, absoluteTableIdentifier, 
tableProvider);
-    } catch (Exception e) {
-      throw new RuntimeException("Error while resolving filter expression", e);
-    }
-  }
-
   public static String createJobTrackerID(java.util.Date date) {
     return new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(date);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
deleted file mode 100644
index 9df59e6..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.util;
-
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.converter.SchemaConverter;
-import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-/**
- * TODO: It should be removed after store manager implementation.
- */
-public class SchemaReader {
-
-  public static CarbonTable readCarbonTableFromStore(AbsoluteTableIdentifier 
identifier)
-      throws IOException {
-    String schemaFilePath = 
CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
-    if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
-        FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
-        FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.S3) ||
-        FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)) {
-      String tableName = identifier.getCarbonTableIdentifier().getTableName();
-
-      org.apache.carbondata.format.TableInfo tableInfo =
-          
CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(identifier.getTablePath()));
-      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-      TableInfo wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(
-          tableInfo,
-          identifier.getCarbonTableIdentifier().getDatabaseName(),
-          tableName,
-          identifier.getTablePath());
-      CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
-      return CarbonMetadata.getInstance().getCarbonTable(
-          identifier.getCarbonTableIdentifier().getTableUniqueName());
-    } else {
-      throw new IOException("File does not exist: " + schemaFilePath);
-    }
-  }
-
-  /**
-   * the method returns the Wrapper TableInfo
-   *
-   * @param identifier
-   * @return
-   */
-  public static TableInfo getTableInfo(AbsoluteTableIdentifier identifier)
-      throws IOException {
-    org.apache.carbondata.format.TableInfo thriftTableInfo =
-        
CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(identifier.getTablePath()));
-    ThriftWrapperSchemaConverterImpl thriftWrapperSchemaConverter =
-        new ThriftWrapperSchemaConverterImpl();
-    CarbonTableIdentifier carbonTableIdentifier =
-        identifier.getCarbonTableIdentifier();
-    return thriftWrapperSchemaConverter.fromExternalToWrapperTableInfo(
-        thriftTableInfo,
-        carbonTableIdentifier.getDatabaseName(),
-        carbonTableIdentifier.getTableName(),
-        identifier.getTablePath());
-  }
-
-
-  public static TableInfo inferSchema(AbsoluteTableIdentifier identifier)
-      throws IOException {
-    // This routine is going to infer schema from the carbondata file footer
-    // Convert the ColumnSchema -> TableSchema -> TableInfo.
-    // Return the TableInfo.
-    org.apache.carbondata.format.TableInfo tableInfo =
-        CarbonUtil.inferSchema(identifier.getTablePath(), identifier, false);
-    SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-    TableInfo wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(
-        tableInfo, identifier.getDatabaseName(), identifier.getTableName(),
-        identifier.getTablePath());
-    return wrapperTableInfo;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
----------------------------------------------------------------------
diff --git 
a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
 
b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index 1b57f93..4c9e417 100644
--- 
a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ 
b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.scan.expression.Expression;
@@ -35,9 +36,7 @@ import org.apache.carbondata.core.util.DataTypeConverterImpl;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
-import org.apache.carbondata.hadoop.util.SchemaReader;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.InvalidPathException;
@@ -146,9 +145,8 @@ public class MapredCarbonInputFormat extends 
CarbonTableInputFormat<ArrayWritabl
         projectionColumns, new DataTypeConverterImpl());
     // set the filter to the query model in order to filter blocklet before 
scan
     Expression filter = getFilterPredicates(configuration);
-    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, 
null);
-    FilterResolverIntf filterIntf =
-        CarbonInputFormatUtil.resolveFilter(filter, identifier, tableProvider);
+    carbonTable.processFilterExpression(filter, null, null);
+    FilterResolverIntf filterIntf = carbonTable.resolveFilter(filter, 
tableProvider);
     queryModel.setFilterExpressionResolverTree(filterIntf);
 
     return queryModel;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index a8316e9..1d91458 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -31,14 +31,14 @@ import org.apache.carbondata.common.logging.{LogService, 
LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonTableIdentifier, SegmentFileStore}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.SchemaReader
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, 
TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, 
RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent}
-import org.apache.carbondata.hadoop.util.SchemaReader
 
 /**
  * Command to register carbon table from existing carbon table data

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 55eb5ac..610f9bf 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -35,8 +35,8 @@ import org.apache.spark.util.CarbonReflectionUtils
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.SchemaReader
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.util.SchemaReader
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
new file mode 100644
index 0000000..8cb8b2c
--- /dev/null
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.RecordReader;
+
+public class CarbonReader<T> {
+
+  private List<RecordReader<Void, T>> readers;
+
+  private RecordReader<Void, T> currentReader;
+
+  private int index;
+
+  CarbonReader(List<RecordReader<Void, T>> readers) {
+    if (readers.size() == 0) {
+      throw new IllegalArgumentException("no reader");
+    }
+    this.readers = readers;
+    this.index = 0;
+    this.currentReader = readers.get(0);
+  }
+
+  public boolean hasNext() throws IOException, InterruptedException {
+    if (currentReader.nextKeyValue()) {
+      return true;
+    } else {
+      if (index == readers.size() - 1) {
+        // no more readers
+        return false;
+      } else {
+        index++;
+        currentReader = readers.get(index);
+        return currentReader.nextKeyValue();
+      }
+    }
+  }
+
+  public T readNextRow() throws IOException, InterruptedException {
+    return currentReader.getCurrentValue();
+  }
+
+  public static CarbonReaderBuilder builder(String tablePath) {
+    return new CarbonReaderBuilder(tablePath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
new file mode 100644
index 0000000..894b973
--- /dev/null
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.hadoop.CarbonProjection;
+import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
[email protected]
[email protected]
+public class CarbonReaderBuilder {
+
+  private String tablePath;
+  private String[] projectionColumns;
+  private Expression filterExpression;
+
+  CarbonReaderBuilder(String tablePath) {
+    this.tablePath = tablePath;
+  }
+
+  public CarbonReaderBuilder projection(String[] projectionColumnNames) {
+    Objects.requireNonNull(projectionColumnNames);
+    this.projectionColumns = projectionColumnNames;
+    return this;
+  }
+
+  public CarbonReaderBuilder filter(Expression fileterExpression) {
+    Objects.requireNonNull(fileterExpression);
+    this.filterExpression = fileterExpression;
+    return this;
+  }
+
+  public <T> CarbonReader<T> build() throws IOException, InterruptedException {
+    CarbonTable table = CarbonTable.buildFromTablePath("_temp", tablePath);
+
+    final CarbonFileInputFormat format = new CarbonFileInputFormat();
+    final Job job = new Job(new Configuration());
+    format.setTableInfo(job.getConfiguration(), table.getTableInfo());
+    format.setTablePath(job.getConfiguration(), table.getTablePath());
+    format.setTableName(job.getConfiguration(), table.getTableName());
+    format.setDatabaseName(job.getConfiguration(), table.getDatabaseName());
+    if (filterExpression != null) {
+      format.setFilterPredicates(job.getConfiguration(), filterExpression);
+    }
+    if (projectionColumns != null) {
+      format.setColumnProjection(job.getConfiguration(), new 
CarbonProjection(projectionColumns));
+    }
+
+    final List<InputSplit> splits =
+        format.getSplits(new JobContextImpl(job.getConfiguration(), new 
JobID()));
+
+    List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size());
+    for (InputSplit split : splits) {
+      TaskAttemptContextImpl attempt =
+          new TaskAttemptContextImpl(job.getConfiguration(), new 
TaskAttemptID());
+      RecordReader reader = format.createRecordReader(split, attempt);
+      reader.initialize(split, attempt);
+      readers.add(reader);
+    }
+
+    return new CarbonReader<>(readers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index 9c07065..68663ec 100644
--- 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -19,11 +19,14 @@ package org.apache.carbondata.sdk.file;
 
 import java.io.File;
 import java.io.FileFilter;
+import java.io.FilenameFilter;
 import java.io.IOException;
 
+import org.apache.carbondata.common.Strings;
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.expression.logical.TrueExpression;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.commons.io.FileUtils;
@@ -135,6 +138,37 @@ public class CSVCarbonWriterTest {
   }
 
   @Test
+  public void testWriteAndReadFiles() throws IOException, InterruptedException 
{
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path, true);
+
+    File[] files = new File(path + "/Fact/Part0/Segment_null/").listFiles(new 
FilenameFilter() {
+      @Override public boolean accept(File dir, String name) {
+        return name.endsWith("carbondata");
+      }
+    });
+
+    CarbonReader reader = CarbonReader.builder(path)
+        .projection(new String[]{"name", "age"}).build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[])reader.readNextRow();
+      Assert.assertEquals("robot" + (i % 10), row[0]);
+      Assert.assertEquals(i, row[1]);
+      i++;
+    }
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
   public void testAllPrimitiveDataType() throws IOException {
     // TODO: write all data type and read by CarbonRecordReader to verify the 
content
     String path = "./testWriteFiles";

Reply via email to