Repository: carbondata Updated Branches: refs/heads/master 982d03fea -> df5d7a99e
[CARBONDATA-1998][SDK] Support CarbonReader to read carbondata files Support CarbonReader to read carbondata files This closes #2072 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/df5d7a99 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/df5d7a99 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/df5d7a99 Branch: refs/heads/master Commit: df5d7a99eabc951eed0d3cf4aec8985d83b74160 Parents: c09ef99 Author: Jacky Li <[email protected]> Authored: Sat Mar 17 17:18:24 2018 +0800 Committer: ravipesala <[email protected]> Committed: Fri Mar 23 21:20:24 2018 +0530 ---------------------------------------------------------------------- .../core/indexstore/BlockletDetailInfo.java | 5 +- .../indexstore/blockletindex/IndexWrapper.java | 3 +- .../core/metadata/schema/SchemaReader.java | 97 ++++++++++++++++++++ .../core/metadata/schema/table/CarbonTable.java | 82 ++++++++++++++++- .../schema/table/TableSchemaBuilder.java | 9 ++ .../executor/impl/AbstractQueryExecutor.java | 14 ++- .../core/scan/model/QueryModelBuilder.java | 73 +++++++++++++++ .../carbondata/hadoop/CarbonProjection.java | 11 +++ .../hadoop/api/CarbonFileInputFormat.java | 8 +- .../hadoop/api/CarbonInputFormat.java | 7 +- .../hadoop/api/CarbonTableInputFormat.java | 13 +-- .../hadoop/util/CarbonInputFormatUtil.java | 40 -------- .../carbondata/hadoop/util/SchemaReader.java | 97 -------------------- .../hive/MapredCarbonInputFormat.java | 8 +- .../management/RefreshCarbonTableCommand.scala | 4 +- .../spark/sql/parser/CarbonSparkSqlParser.scala | 2 +- .../carbondata/sdk/file/CarbonReader.java | 64 +++++++++++++ .../sdk/file/CarbonReaderBuilder.java | 95 +++++++++++++++++++ .../sdk/file/CSVCarbonWriterTest.java | 34 +++++++ 19 files changed, 495 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java index 2865d4b..660f4c1 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java @@ -209,7 +209,10 @@ public class BlockletDetailInfo implements Serializable, Writable { this.blockFooterOffset = blockFooterOffset; } - public List<ColumnSchema> getColumnSchemas() { + public List<ColumnSchema> getColumnSchemas() throws IOException { + if (columnSchemas == null && columnSchemaBinary != null) { + readColumnSchema(columnSchemaBinary); + } return columnSchemas; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java index 95232e5..1de3122 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/IndexWrapper.java @@ -16,6 +16,7 @@ */ package org.apache.carbondata.core.indexstore.blockletindex; +import java.io.IOException; import java.util.List; import org.apache.carbondata.core.datastore.block.AbstractIndex; @@ -33,7 +34,7 @@ public class IndexWrapper extends AbstractIndex { private List<TableBlockInfo> blockInfos; - public IndexWrapper(List<TableBlockInfo> blockInfos) { + public IndexWrapper(List<TableBlockInfo> blockInfos) throws IOException { this.blockInfos = blockInfos; segmentProperties = new SegmentProperties(blockInfos.get(0).getDetailInfo().getColumnSchemas(), blockInfos.get(0).getDetailInfo().getDimLens()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java new file mode 100644 index 0000000..787a9b9 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata.schema; + +import java.io.IOException; + +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.CarbonMetadata; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.metadata.converter.SchemaConverter; +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +/** + * TODO: It should be removed after store manager implementation. + */ +public class SchemaReader { + + public static CarbonTable readCarbonTableFromStore(AbsoluteTableIdentifier identifier) + throws IOException { + String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath()); + if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) || + FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) || + FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.S3) || + FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)) { + String tableName = identifier.getCarbonTableIdentifier().getTableName(); + + org.apache.carbondata.format.TableInfo tableInfo = + CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(identifier.getTablePath())); + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( + tableInfo, + identifier.getCarbonTableIdentifier().getDatabaseName(), + tableName, + identifier.getTablePath()); + CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo); + return CarbonMetadata.getInstance().getCarbonTable( + identifier.getCarbonTableIdentifier().getTableUniqueName()); + } else { + throw new IOException("File does not exist: " + schemaFilePath); + } + } + + /** + * the method returns the Wrapper TableInfo + * + * @param identifier + * @return + */ + public static TableInfo getTableInfo(AbsoluteTableIdentifier identifier) + throws IOException { + org.apache.carbondata.format.TableInfo thriftTableInfo = + CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(identifier.getTablePath())); + ThriftWrapperSchemaConverterImpl thriftWrapperSchemaConverter = + new ThriftWrapperSchemaConverterImpl(); + CarbonTableIdentifier carbonTableIdentifier = + identifier.getCarbonTableIdentifier(); + return thriftWrapperSchemaConverter.fromExternalToWrapperTableInfo( + thriftTableInfo, + carbonTableIdentifier.getDatabaseName(), + carbonTableIdentifier.getTableName(), + identifier.getTablePath()); + } + + + public static TableInfo inferSchema(AbsoluteTableIdentifier identifier) + throws IOException { + // This routine is going to infer schema from the carbondata file footer + // Convert the ColumnSchema -> TableSchema -> TableInfo. + // Return the TableInfo. + org.apache.carbondata.format.TableInfo tableInfo = + CarbonUtil.inferSchema(identifier.getTablePath(), identifier, false); + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( + tableInfo, identifier.getDatabaseName(), identifier.getTableName(), + identifier.getTablePath()); + return wrapperTableInfo; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 9e0d80a..9d50048 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -30,21 +30,32 @@ import java.util.Map; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; +import org.apache.carbondata.core.metadata.datatype.StructField; import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.BucketingInfo; import org.apache.carbondata.core.metadata.schema.PartitionInfo; +import org.apache.carbondata.core.metadata.schema.SchemaReader; import org.apache.carbondata.core.metadata.schema.partition.PartitionType; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.reader.CarbonHeaderReader; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; +import org.apache.carbondata.core.scan.filter.TableProvider; +import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer; +import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.model.QueryProjection; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeConverter; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.FileHeader; /** * Mapping class for Carbon actual table @@ -192,6 +203,34 @@ public class CarbonTable implements Serializable { } } + public static CarbonTable buildFromDataFile( + String tableName, String tablePath, String filePath) throws IOException { + CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(filePath); + FileHeader fileHeader = carbonHeaderReader.readHeader(); + TableSchemaBuilder builder = TableSchema.builder(); + ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl(); + for (org.apache.carbondata.format.ColumnSchema column : fileHeader.getColumn_schema()) { + ColumnSchema columnSchema = schemaConverter.fromExternalToWrapperColumnSchema(column); + builder.addColumn( + new StructField(columnSchema.getColumnName(), columnSchema.getDataType()), false); + } + + TableSchema tableSchema = builder.tableName(tableName).build(); + TableInfo tableInfo = new TableInfo(); + tableInfo.setFactTable(tableSchema); + tableInfo.setTablePath(tablePath); + tableInfo.setDatabaseName("default"); + tableInfo.setTableUniqueName( + CarbonTable.buildUniqueName("default", tableSchema.getTableName())); + return buildFromTableInfo(tableInfo); + } + + public static CarbonTable buildFromTablePath( + String tableName, String tablePath) throws IOException { + return SchemaReader.readCarbonTableFromStore( + AbsoluteTableIdentifier.from(tablePath, tableName, "default")); + } + /** * @param tableInfo */ @@ -492,6 +531,20 @@ public class CarbonTable implements Serializable { } /** + * Return all dimensions of the table + */ + public List<CarbonDimension> getDimensions() { + return tableDimensionsMap.get(getTableName()); + } + + /** + * Return all measure of the table + */ + public List<CarbonMeasure> getMeasures() { + return tableMeasuresMap.get(getTableName()); + } + + /** * This will give user created order column * * @return @@ -877,7 +930,7 @@ public class CarbonTable implements Serializable { return queryModel; } - private QueryProjection createProjection(String[] projectionColumnNames) { + public QueryProjection createProjection(String[] projectionColumnNames) { String factTableName = getTableName(); QueryProjection projection = new QueryProjection(); // fill dimensions @@ -904,6 +957,33 @@ public class CarbonTable implements Serializable { return projection; } + public void processFilterExpression(Expression filterExpression, + boolean[] isFilterDimensions, boolean[] isFilterMeasures) { + QueryModel.processFilterExpression(this, filterExpression, isFilterDimensions, + isFilterMeasures); + + if (null != filterExpression) { + // Optimize Filter Expression and fit RANGE filters is conditions apply. + FilterOptimizer rangeFilterOptimizer = + new RangeFilterOptmizer(filterExpression); + rangeFilterOptimizer.optimizeFilter(); + } + } + + /** + * Resolve the filter expression. + */ + public FilterResolverIntf resolveFilter(Expression filterExpression, + TableProvider tableProvider) { + try { + FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor(); + return filterExpressionProcessor.getFilterResolver( + filterExpression, getAbsoluteTableIdentifier(), tableProvider); + } catch (Exception e) { + throw new RuntimeException("Error while resolving filter expression", e); + } + } + /** * Create a {@link CarbonTableBuilder} to create {@link CarbonTable} */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java index 8fdcbb1..2dd5a9e 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java @@ -47,6 +47,8 @@ public class TableSchemaBuilder { private int blockSize; + private String tableName; + public TableSchemaBuilder blockSize(int blockSize) { if (blockSize <= 0) { throw new IllegalArgumentException("blockSize should be greater than 0"); @@ -55,8 +57,15 @@ public class TableSchemaBuilder { return this; } + public TableSchemaBuilder tableName(String tableName) { + Objects.requireNonNull(tableName); + this.tableName = tableName; + return this; + } + public TableSchema build() { TableSchema schema = new TableSchema(); + schema.setTableName(tableName); schema.setTableId(UUID.randomUUID().toString()); schema.setPartitionInfo(null); schema.setBucketingInfo(null); http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index 22d1df1..8f77996 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -265,11 +265,15 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { AbstractIndex abstractIndex = queryProperties.dataBlocks.get(i); BlockletDataRefNode dataRefNode = (BlockletDataRefNode) abstractIndex.getDataRefNode(); - blockExecutionInfoList.add(getBlockExecutionInfoForBlock(queryModel, abstractIndex, - dataRefNode.getBlockInfos().get(0).getBlockletInfos().getStartBlockletNumber(), - dataRefNode.numberOfNodes(), dataRefNode.getBlockInfos().get(0).getFilePath(), - dataRefNode.getBlockInfos().get(0).getDeletedDeltaFilePath(), - dataRefNode.getBlockInfos().get(0).getSegmentId())); + blockExecutionInfoList.add( + getBlockExecutionInfoForBlock( + queryModel, + abstractIndex, + dataRefNode.getBlockInfos().get(0).getBlockletInfos().getStartBlockletNumber(), + dataRefNode.numberOfNodes(), + dataRefNode.getBlockInfos().get(0).getFilePath(), + dataRefNode.getBlockInfos().get(0).getDeletedDeltaFilePath(), + dataRefNode.getBlockInfos().get(0).getSegmentId())); } if (null != queryModel.getStatisticsRecorder()) { QueryStatistic queryStatistic = new QueryStatistic(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java new file mode 100644 index 0000000..f40bd8b --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.scan.model; + +import java.util.List; + +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; + +public class QueryModelBuilder { + + private CarbonTable carbonTable; + + public QueryModelBuilder(CarbonTable carbonTable) { + this.carbonTable = carbonTable; + } + + public QueryModel build(String[] projectionColumnNames, Expression filterExpression) { + QueryModel queryModel = QueryModel.newInstance(carbonTable); + QueryProjection projection = carbonTable.createProjection(projectionColumnNames); + queryModel.setProjection(projection); + boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()]; + boolean[] isFilterMeasures = new boolean[carbonTable.getAllMeasures().size()]; + carbonTable.processFilterExpression(filterExpression, isFilterDimensions, isFilterMeasures); + queryModel.setIsFilterDimensions(isFilterDimensions); + queryModel.setIsFilterMeasures(isFilterMeasures); + FilterResolverIntf filterIntf = carbonTable.resolveFilter(filterExpression, null); + queryModel.setFilterExpressionResolverTree(filterIntf); + return queryModel; + } + + public QueryModel build(Expression filterExpression) { + QueryProjection projection = new QueryProjection(); + + List<CarbonDimension> dimensions = carbonTable.getDimensions(); + for (int i = 0; i < dimensions.size(); i++) { + projection.addDimension(dimensions.get(i), i); + } + List<CarbonMeasure> measures = carbonTable.getMeasures(); + for (int i = 0; i < measures.size(); i++) { + projection.addMeasure(measures.get(i), i); + } + + QueryModel queryModel = QueryModel.newInstance(carbonTable); + queryModel.setProjection(projection); + boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()]; + boolean[] isFilterMeasures = new boolean[carbonTable.getAllMeasures().size()]; + carbonTable.processFilterExpression(filterExpression, isFilterDimensions, isFilterMeasures); + queryModel.setIsFilterDimensions(isFilterDimensions); + queryModel.setIsFilterMeasures(isFilterMeasures); + FilterResolverIntf filterIntf = carbonTable.resolveFilter(filterExpression, null); + queryModel.setFilterExpressionResolverTree(filterIntf); + return queryModel; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java index e236615..b5d0b51 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java @@ -18,6 +18,7 @@ package org.apache.carbondata.hadoop; import java.io.Serializable; import java.util.LinkedHashSet; +import java.util.Objects; import java.util.Set; /** @@ -29,6 +30,16 @@ public class CarbonProjection implements Serializable { private Set<String> columns = new LinkedHashSet<>(); + public CarbonProjection() { + } + + public CarbonProjection(String[] columnNames) { + Objects.requireNonNull(columnNames); + for (String columnName : columnNames) { + columns.add(columnName); + } + } + public void addColumn(String column) { columns.add(column); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java index ff532b7..c0366d2 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java @@ -32,6 +32,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.PartitionInfo; +import org.apache.carbondata.core.metadata.schema.SchemaReader; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.mutate.UpdateVO; @@ -43,8 +44,6 @@ import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.hadoop.CarbonInputSplit; -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; -import org.apache.carbondata.hadoop.util.SchemaReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; @@ -119,10 +118,9 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se TableProvider tableProvider = new SingleTableProvider(carbonTable); // this will be null in case of corrupt schema file. PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName()); - CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null); + carbonTable.processFilterExpression(filter, null, null); - FilterResolverIntf filterInterface = CarbonInputFormatUtil - .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider); + FilterResolverIntf filterInterface = carbonTable.resolveFilter(filter, tableProvider); String segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null"); FileFactory.FileType fileType = FileFactory.getFileType(segmentDir); http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 3cc9c5f..5506af6 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -61,7 +61,6 @@ import org.apache.carbondata.hadoop.CarbonProjection; import org.apache.carbondata.hadoop.CarbonRecordReader; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; import org.apache.commons.logging.Log; @@ -411,12 +410,10 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()]; // getAllMeasures returns list of visible and invisible columns boolean[] isFilterMeasures = new boolean[carbonTable.getAllMeasures().size()]; - CarbonInputFormatUtil - .processFilterExpression(filter, carbonTable, isFilterDimensions, isFilterMeasures); + carbonTable.processFilterExpression(filter, isFilterDimensions, isFilterMeasures); queryModel.setIsFilterDimensions(isFilterDimensions); queryModel.setIsFilterMeasures(isFilterMeasures); - FilterResolverIntf filterIntf = CarbonInputFormatUtil - .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider); + FilterResolverIntf filterIntf = carbonTable.resolveFilter(filter, tableProvider); queryModel.setFilterExpressionResolverTree(filterIntf); // update the file level index store if there are invalid segment http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 605ea48..290b3d7 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -37,6 +37,7 @@ import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.PartitionInfo; +import org.apache.carbondata.core.metadata.schema.SchemaReader; import org.apache.carbondata.core.metadata.schema.partition.PartitionType; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; @@ -58,8 +59,6 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.format.BlockIndex; import org.apache.carbondata.hadoop.CarbonInputSplit; -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; -import org.apache.carbondata.hadoop.util.SchemaReader; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -206,7 +205,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { TableProvider tableProvider = new SingleTableProvider(carbonTable); // this will be null in case of corrupt schema file. PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName()); - CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null); + carbonTable.processFilterExpression(filter, null, null); // prune partitions for filter query on partition table BitSet matchedPartitions = null; @@ -221,8 +220,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { } } - FilterResolverIntf filterInterface = CarbonInputFormatUtil - .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider); + FilterResolverIntf filterInterface = carbonTable.resolveFilter(filter, tableProvider); // do block filtering and get split List<InputSplit> splits = @@ -385,7 +383,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { throw new IOException("Missing/Corrupt schema file for table."); } - CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null); + carbonTable.processFilterExpression(filter, null, null); TableProvider tableProvider = new SingleTableProvider(carbonTable); // prune partitions for filter query on partition table @@ -404,8 +402,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { } } - FilterResolverIntf filterInterface = - CarbonInputFormatUtil.resolveFilter(filter, identifier, tableProvider); + FilterResolverIntf filterInterface = carbonTable.resolveFilter(filter, tableProvider); // do block filtering and get split List<InputSplit> splits = getSplits(job, filterInterface, segmentList, matchedPartitions, partitionInfo, oldPartitionIdList, new SegmentUpdateStatusManager(identifier)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java index 9f8c5ec..d6d6603 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java @@ -25,14 +25,6 @@ import java.util.Locale; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.scan.expression.Expression; -import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; -import org.apache.carbondata.core.scan.filter.TableProvider; -import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer; -import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer; -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; -import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; import org.apache.hadoop.conf.Configuration; @@ -71,38 +63,6 @@ public class CarbonInputFormatUtil { return carbonTableInputFormat; } - public static void processFilterExpression(Expression filterExpression, CarbonTable carbonTable, - boolean[] isFilterDimensions, boolean[] isFilterMeasures) { - QueryModel.processFilterExpression(carbonTable, filterExpression, isFilterDimensions, - isFilterMeasures); - - if (null != filterExpression) { - // Optimize Filter Expression and fit RANGE filters is conditions apply. - FilterOptimizer rangeFilterOptimizer = - new RangeFilterOptmizer(filterExpression); - rangeFilterOptimizer.optimizeFilter(); - } - } - - /** - * Resolve the filter expression. - * - * @param filterExpression - * @param absoluteTableIdentifier - * @return - */ - public static FilterResolverIntf resolveFilter(Expression filterExpression, - AbsoluteTableIdentifier absoluteTableIdentifier, TableProvider tableProvider) { - try { - FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor(); - //get resolved filter - return filterExpressionProcessor - .getFilterResolver(filterExpression, absoluteTableIdentifier, tableProvider); - } catch (Exception e) { - throw new RuntimeException("Error while resolving filter expression", e); - } - } - public static String createJobTrackerID(java.util.Date date) { return new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(date); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java deleted file mode 100644 index 9df59e6..0000000 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.hadoop.util; - -import java.io.IOException; - -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonMetadata; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; -import org.apache.carbondata.core.metadata.converter.SchemaConverter; -import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonTablePath; - -/** - * TODO: It should be removed after store manager implementation. - */ -public class SchemaReader { - - public static CarbonTable readCarbonTableFromStore(AbsoluteTableIdentifier identifier) - throws IOException { - String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath()); - if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) || - FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) || - FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.S3) || - FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)) { - String tableName = identifier.getCarbonTableIdentifier().getTableName(); - - org.apache.carbondata.format.TableInfo tableInfo = - CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(identifier.getTablePath())); - SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); - TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( - tableInfo, - identifier.getCarbonTableIdentifier().getDatabaseName(), - tableName, - identifier.getTablePath()); - CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo); - return CarbonMetadata.getInstance().getCarbonTable( - identifier.getCarbonTableIdentifier().getTableUniqueName()); - } else { - throw new IOException("File does not exist: " + schemaFilePath); - } - } - - /** - * the method returns the Wrapper TableInfo - * - * @param identifier - * @return - */ - public static TableInfo getTableInfo(AbsoluteTableIdentifier identifier) - throws IOException { - org.apache.carbondata.format.TableInfo thriftTableInfo = - CarbonUtil.readSchemaFile(CarbonTablePath.getSchemaFilePath(identifier.getTablePath())); - ThriftWrapperSchemaConverterImpl thriftWrapperSchemaConverter = - new ThriftWrapperSchemaConverterImpl(); - CarbonTableIdentifier carbonTableIdentifier = - identifier.getCarbonTableIdentifier(); - return thriftWrapperSchemaConverter.fromExternalToWrapperTableInfo( - thriftTableInfo, - carbonTableIdentifier.getDatabaseName(), - carbonTableIdentifier.getTableName(), - identifier.getTablePath()); - } - - - public static TableInfo inferSchema(AbsoluteTableIdentifier identifier) - throws IOException { - // This routine is going to infer schema from the carbondata file footer - // Convert the ColumnSchema -> TableSchema -> TableInfo. - // Return the TableInfo. - org.apache.carbondata.format.TableInfo tableInfo = - CarbonUtil.inferSchema(identifier.getTablePath(), identifier, false); - SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); - TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( - tableInfo, identifier.getDatabaseName(), identifier.getTableName(), - identifier.getTablePath()); - return wrapperTableInfo; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java index 1b57f93..4c9e417 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java @@ -24,6 +24,7 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.exception.InvalidConfigurationException; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.schema.SchemaReader; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.scan.expression.Expression; @@ -35,9 +36,7 @@ import org.apache.carbondata.core.util.DataTypeConverterImpl; import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; -import org.apache.carbondata.hadoop.util.SchemaReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.InvalidPathException; @@ -146,9 +145,8 @@ public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritabl projectionColumns, new DataTypeConverterImpl()); // set the filter to the query model in order to filter blocklet before scan Expression filter = getFilterPredicates(configuration); - CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null); - FilterResolverIntf filterIntf = - CarbonInputFormatUtil.resolveFilter(filter, identifier, tableProvider); + carbonTable.processFilterExpression(filter, null, null); + FilterResolverIntf filterIntf = carbonTable.resolveFilter(filter, tableProvider); queryModel.setFilterExpressionResolverTree(filterIntf); return queryModel; http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala index a8316e9..1d91458 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala @@ -31,14 +31,14 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.PartitionSpec -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, SegmentFileStore} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore} +import org.apache.carbondata.core.metadata.schema.SchemaReader import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent} -import org.apache.carbondata.hadoop.util.SchemaReader /** * Command to register carbon table from existing carbon table data http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index 55eb5ac..610f9bf 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -35,8 +35,8 @@ import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.schema.SchemaReader import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.hadoop.util.SchemaReader import org.apache.carbondata.spark.CarbonOption import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil} http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java new file mode 100644 index 0000000..8cb8b2c --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.mapreduce.RecordReader; + +public class CarbonReader<T> { + + private List<RecordReader<Void, T>> readers; + + private RecordReader<Void, T> currentReader; + + private int index; + + CarbonReader(List<RecordReader<Void, T>> readers) { + if (readers.size() == 0) { + throw new IllegalArgumentException("no reader"); + } + this.readers = readers; + this.index = 0; + this.currentReader = readers.get(0); + } + + public boolean hasNext() throws IOException, InterruptedException { + if (currentReader.nextKeyValue()) { + return true; + } else { + if (index == readers.size() - 1) { + // no more readers + return false; + } else { + index++; + currentReader = readers.get(index); + return currentReader.nextKeyValue(); + } + } + } + + public T readNextRow() throws IOException, InterruptedException { + return currentReader.getCurrentValue(); + } + + public static CarbonReaderBuilder builder(String tablePath) { + return new CarbonReaderBuilder(tablePath); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java new file mode 100644 index 0000000..894b973 --- /dev/null +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.hadoop.CarbonProjection; +import org.apache.carbondata.hadoop.api.CarbonFileInputFormat; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + [email protected] [email protected] +public class CarbonReaderBuilder { + + private String tablePath; + private String[] projectionColumns; + private Expression filterExpression; + + CarbonReaderBuilder(String tablePath) { + this.tablePath = tablePath; + } + + public CarbonReaderBuilder projection(String[] projectionColumnNames) { + Objects.requireNonNull(projectionColumnNames); + this.projectionColumns = projectionColumnNames; + return this; + } + + public CarbonReaderBuilder filter(Expression fileterExpression) { + Objects.requireNonNull(fileterExpression); + this.filterExpression = fileterExpression; + return this; + } + + public <T> CarbonReader<T> build() throws IOException, InterruptedException { + CarbonTable table = CarbonTable.buildFromTablePath("_temp", tablePath); + + final CarbonFileInputFormat format = new CarbonFileInputFormat(); + final Job job = new Job(new Configuration()); + format.setTableInfo(job.getConfiguration(), table.getTableInfo()); + format.setTablePath(job.getConfiguration(), table.getTablePath()); + format.setTableName(job.getConfiguration(), table.getTableName()); + format.setDatabaseName(job.getConfiguration(), table.getDatabaseName()); + if (filterExpression != null) { + format.setFilterPredicates(job.getConfiguration(), filterExpression); + } + if (projectionColumns != null) { + format.setColumnProjection(job.getConfiguration(), new CarbonProjection(projectionColumns)); + } + + final List<InputSplit> splits = + format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID())); + + List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size()); + for (InputSplit split : splits) { + TaskAttemptContextImpl attempt = + new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); + RecordReader reader = format.createRecordReader(split, attempt); + reader.initialize(split, attempt); + readers.add(reader); + } + + return new CarbonReader<>(readers); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/df5d7a99/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java index 9c07065..68663ec 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java @@ -19,11 +19,14 @@ package org.apache.carbondata.sdk.file; import java.io.File; import java.io.FileFilter; +import java.io.FilenameFilter; import java.io.IOException; +import org.apache.carbondata.common.Strings; import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.scan.expression.logical.TrueExpression; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.commons.io.FileUtils; @@ -135,6 +138,37 @@ public class CSVCarbonWriterTest { } @Test + public void testWriteAndReadFiles() throws IOException, InterruptedException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(new Schema(fields), path, true); + + File[] files = new File(path + "/Fact/Part0/Segment_null/").listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.endsWith("carbondata"); + } + }); + + CarbonReader reader = CarbonReader.builder(path) + .projection(new String[]{"name", "age"}).build(); + + int i = 0; + while (reader.hasNext()) { + Object[] row = (Object[])reader.readNextRow(); + Assert.assertEquals("robot" + (i % 10), row[0]); + Assert.assertEquals(i, row[1]); + i++; + } + + FileUtils.deleteDirectory(new File(path)); + } + + @Test public void testAllPrimitiveDataType() throws IOException { // TODO: write all data type and read by CarbonRecordReader to verify the content String path = "./testWriteFiles";
