[CARBONDATA-2224][File Level Reader Support] Refactoring of #2055 Review comment fixes and refactoring of #2055
This closes #2069 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c09ef998 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c09ef998 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c09ef998 Branch: refs/heads/master Commit: c09ef99895c9ed6e9279aba2b420aa43095b6e50 Parents: 9a25dc6 Author: Ajantha-Bhat <[email protected]> Authored: Fri Mar 16 16:06:04 2018 +0530 Committer: ravipesala <[email protected]> Committed: Fri Mar 23 21:20:24 2018 +0530 ---------------------------------------------------------------------- .../core/metadata/schema/table/CarbonTable.java | 4 +- .../apache/carbondata/core/util/CarbonUtil.java | 2 +- .../carbondata/examples/HadoopFileExample.scala | 8 +- .../hadoop/api/CarbonFileInputFormat.java | 535 ++----------------- .../hadoop/api/CarbonInputFormat.java | 534 ++++++++++++++++++ .../hadoop/api/CarbonTableInputFormat.java | 463 +--------------- .../carbondata/hadoop/util/SchemaReader.java | 8 +- ...FileInputFormatWithExternalCarbonTable.scala | 16 +- ...tCreateTableUsingSparkCarbonFileFormat.scala | 25 +- ...tSparkCarbonFileFormatWithSparkSession.scala | 4 +- .../spark/rdd/CarbonIUDMergerRDD.scala | 6 +- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 6 +- .../spark/rdd/CarbonScanPartitionRDD.scala | 4 +- .../carbondata/spark/rdd/CarbonScanRDD.scala | 60 +-- .../org/apache/spark/util/PartitionUtils.scala | 4 +- .../org/apache/spark/sql/CarbonCountStar.scala | 4 +- .../command/mutation/DeleteExecution.scala | 4 +- .../command/table/CarbonDropTableCommand.scala | 2 +- .../datasources/SparkCarbonFileFormat.scala | 15 +- .../sql/execution/strategy/DDLStrategy.scala | 6 +- .../spark/sql/parser/CarbonSparkSqlParser.scala | 8 +- .../spark/sql/hive/CarbonSessionState.scala | 2 +- .../spark/sql/hive/CarbonSessionState.scala | 2 +- .../carbondata/streaming/StreamHandoffRDD.scala | 12 +- 24 files changed, 669 insertions(+), 1065 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 278dc96..9e0d80a 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -826,8 +826,8 @@ public class CarbonTable implements Serializable { return external != null && external.equalsIgnoreCase("true"); } - public boolean isFileLevelExternalTable() { - String external = tableInfo.getFactTable().getTableProperties().get("_filelevelexternal"); + public boolean isFileLevelFormat() { + String external = tableInfo.getFactTable().getTableProperties().get("_filelevelformat"); return external != null && external.equalsIgnoreCase("true"); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 337c150..84d3c6c 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -2214,7 +2214,7 @@ public final class CarbonUtil { * @param schemaFilePath * @return */ - public static org.apache.carbondata.format.TableInfo inferSchemaFileExternalTable( + public static org.apache.carbondata.format.TableInfo inferSchema( String carbonDataFilePath, AbsoluteTableIdentifier absoluteTableIdentifier, boolean schemaExists) throws IOException { TBaseCreator createTBase = new ThriftReader.TBaseCreator() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala index d75abc2..465e660 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala @@ -19,7 +19,7 @@ package org.apache.carbondata.examples import org.apache.hadoop.conf.Configuration -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat +import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} import org.apache.carbondata.hadoop.CarbonProjection // scalastyle:off println @@ -34,9 +34,9 @@ object HadoopFileExample { projection.addColumn("c1") // column c1 projection.addColumn("c3") // column c3 val conf = new Configuration() - CarbonTableInputFormat.setColumnProjection(conf, projection) - CarbonTableInputFormat.setDatabaseName(conf, "default") - CarbonTableInputFormat.setTableName(conf, "carbon1") + CarbonInputFormat.setColumnProjection(conf, projection) + CarbonInputFormat.setDatabaseName(conf, "default") + CarbonInputFormat.setTableName(conf, "carbon1") val sc = spark.sparkContext val input = sc.newAPIHadoopFile(s"${ExampleUtils.storeLocation}/default/carbon1", http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java index b86b1cc..ff532b7 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java @@ -17,11 +17,8 @@ package org.apache.carbondata.hadoop.api; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; import java.io.IOException; import java.io.Serializable; -import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.BitSet; import java.util.LinkedList; @@ -30,21 +27,11 @@ import java.util.Map; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datamap.DataMapChooser; -import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.Segment; -import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.exception.InvalidConfigurationException; -import org.apache.carbondata.core.indexstore.ExtendedBlocklet; -import org.apache.carbondata.core.indexstore.PartitionSpec; -import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.schema.PartitionInfo; -import org.apache.carbondata.core.metadata.schema.partition.PartitionType; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.mutate.UpdateVO; @@ -52,241 +39,58 @@ import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.filter.SingleTableProvider; import org.apache.carbondata.core.scan.filter.TableProvider; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; -import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.stats.QueryStatistic; -import org.apache.carbondata.core.stats.QueryStatisticsConstants; -import org.apache.carbondata.core.stats.QueryStatisticsRecorder; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.DataTypeConverter; -import org.apache.carbondata.core.util.DataTypeConverterImpl; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.hadoop.CarbonInputSplit; -import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; -import org.apache.carbondata.hadoop.CarbonProjection; -import org.apache.carbondata.hadoop.CarbonRecordReader; -import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; -import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; -import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; import org.apache.carbondata.hadoop.util.SchemaReader; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.hadoop.mapreduce.security.TokenCache; /** - * Input format of CarbonData file. + * InputFormat for reading carbondata files without table level metadata support, + * schema is inferred as following steps: + * 1. read from schema file is exists + * 2. read from data file footer * * @param <T> */ @InterfaceAudience.User @InterfaceStability.Evolving -public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implements Serializable { - - public static final String READ_SUPPORT_CLASS = "carbon.read.support.class"; - // comma separated list of input segment numbers - public static final String INPUT_SEGMENT_NUMBERS = - "mapreduce.input.carboninputformat.segmentnumbers"; - private static final String VALIDATE_INPUT_SEGMENT_IDs = - "mapreduce.input.carboninputformat.validsegments"; - // comma separated list of input files - public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files"; - private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid"; - private static final Log LOG = LogFactory.getLog(CarbonFileInputFormat.class); - private static final String FILTER_PREDICATE = - "mapreduce.input.carboninputformat.filter.predicate"; - private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection"; - private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo"; - private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport"; - private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter"; - private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr"; - public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName"; - public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName"; - private static final String PARTITIONS_TO_PRUNE = - "mapreduce.input.carboninputformat.partitions.to.prune"; - public static final String UPADTE_T = - "mapreduce.input.carboninputformat.partitions.to.prune"; +public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Serializable { // a cache for carbon table, it will be used in task side private CarbonTable carbonTable; - /** - * Set the `tableInfo` in `configuration` - */ - public static void setTableInfo(Configuration configuration, TableInfo tableInfo) - throws IOException { - if (null != tableInfo) { - configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize())); - } - } - - /** - * Get TableInfo object from `configuration` - */ - private static TableInfo getTableInfo(Configuration configuration) throws IOException { - String tableInfoStr = configuration.get(TABLE_INFO); - if (tableInfoStr == null) { - return null; + protected CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + CarbonTable carbonTableTemp; + if (carbonTable == null) { + // carbon table should be created either from deserialized table info (schema saved in + // hive metastore) or by reading schema in HDFS (schema saved in HDFS) + TableInfo tableInfo = getTableInfo(configuration); + CarbonTable localCarbonTable; + if (tableInfo != null) { + localCarbonTable = CarbonTable.buildFromTableInfo(tableInfo); + } else { + String schemaPath = CarbonTablePath + .getSchemaFilePath(getAbsoluteTableIdentifier(configuration).getTablePath()); + if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) { + TableInfo tableInfoInfer = + SchemaReader.inferSchema(getAbsoluteTableIdentifier(configuration)); + localCarbonTable = CarbonTable.buildFromTableInfo(tableInfoInfer); + } else { + localCarbonTable = + SchemaReader.readCarbonTableFromStore(getAbsoluteTableIdentifier(configuration)); + } + } + this.carbonTable = localCarbonTable; + return localCarbonTable; } else { - TableInfo output = new TableInfo(); - output.readFields( - new DataInputStream( - new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr)))); - return output; - } - } - - - public static void setTablePath(Configuration configuration, String tablePath) { - configuration.set(FileInputFormat.INPUT_DIR, tablePath); - } - - public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) { - configuration.set(ALTER_PARTITION_ID, partitionIds.toString()); - } - - - public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob) - throws IOException { - if (dataMapJob != null) { - String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob); - configuration.set(DATA_MAP_DSTR, toString); - } - } - - public static DataMapJob getDataMapJob(Configuration configuration) throws IOException { - String jobString = configuration.get(DATA_MAP_DSTR); - if (jobString != null) { - return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString); - } - return null; - } - - /** - * It sets unresolved filter expression. - * - * @param configuration - * @param filterExpression - */ - public static void setFilterPredicates(Configuration configuration, Expression filterExpression) { - if (filterExpression == null) { - return; - } - try { - String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression); - configuration.set(FILTER_PREDICATE, filterString); - } catch (Exception e) { - throw new RuntimeException("Error while setting filter expression to Job", e); - } - } - - public static void setColumnProjection(Configuration configuration, CarbonProjection projection) { - if (projection == null || projection.isEmpty()) { - return; - } - String[] allColumns = projection.getAllColumns(); - StringBuilder builder = new StringBuilder(); - for (String column : allColumns) { - builder.append(column).append(","); - } - String columnString = builder.toString(); - columnString = columnString.substring(0, columnString.length() - 1); - configuration.set(COLUMN_PROJECTION, columnString); - } - - public static String getColumnProjection(Configuration configuration) { - return configuration.get(COLUMN_PROJECTION); - } - - - /** - * Set list of segments to access - */ - public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) { - configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments)); - } - - /** - * Set `CARBON_INPUT_SEGMENTS` from property to configuration - */ - public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) { - String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase(); - String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase(); - String segmentNumbersFromProperty = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*"); - if (!segmentNumbersFromProperty.trim().equals("*")) { - CarbonFileInputFormat - .setSegmentsToAccess(conf, Segment.toSegmentList(segmentNumbersFromProperty.split(","))); - } - } - - /** - * set list of segment to access - */ - public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate) { - configuration.set(CarbonFileInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString()); - } - - /** - * get list of segment to access - */ - public static boolean getValidateSegmentsToAccess(Configuration configuration) { - return configuration.get(CarbonFileInputFormat.VALIDATE_INPUT_SEGMENT_IDs, "true") - .equalsIgnoreCase("true"); - } - - /** - * set list of partitions to prune - */ - public static void setPartitionsToPrune(Configuration configuration, - List<PartitionSpec> partitions) { - if (partitions == null) { - return; - } - try { - String partitionString = - ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions)); - configuration.set(PARTITIONS_TO_PRUNE, partitionString); - } catch (Exception e) { - throw new RuntimeException("Error while setting patition information to Job", e); - } - } - - /** - * get list of partitions to prune - */ - private static List<PartitionSpec> getPartitionsToPrune(Configuration configuration) - throws IOException { - String partitionString = configuration.get(PARTITIONS_TO_PRUNE); - if (partitionString != null) { - return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString); - } - return null; - } - - public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) - throws IOException { - String tablePath = configuration.get(INPUT_DIR, ""); - try { - return AbsoluteTableIdentifier - .from(tablePath, getDatabaseName(configuration), getTableName(configuration)); - } catch (InvalidConfigurationException e) { - throw new IOException(e); + carbonTableTemp = this.carbonTable; + return carbonTableTemp; } } @@ -306,8 +110,6 @@ public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implement if (null == carbonTable) { throw new IOException("Missing/Corrupt schema file for table."); } - // TableDataMap blockletMap = DataMapStoreManager.getInstance() - // .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName()); if (getValidateSegmentsToAccess(job.getConfiguration())) { // get all valid segments and set them into the configuration @@ -346,8 +148,6 @@ public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implement return null; } - - /** * {@inheritDoc} * Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS @@ -404,279 +204,4 @@ public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implement } return result; } - - protected Expression getFilterPredicates(Configuration configuration) { - try { - String filterExprString = configuration.get(FILTER_PREDICATE); - if (filterExprString == null) { - return null; - } - Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString); - return (Expression) filter; - } catch (IOException e) { - throw new RuntimeException("Error while reading filter expression", e); - } - } - - /** - * get data blocks of given segment - */ - private List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job, - AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver, - BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo, - List<Integer> oldPartitionIdList) throws IOException { - - QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder(); - QueryStatistic statistic = new QueryStatistic(); - - // get tokens for all the required FileSystem for table path - TokenCache.obtainTokensForNamenodes(job.getCredentials(), - new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration()); - boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, - CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT)); - DataMapExprWrapper dataMapExprWrapper = - DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), resolver); - DataMapJob dataMapJob = getDataMapJob(job.getConfiguration()); - List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration()); - List<ExtendedBlocklet> prunedBlocklets; - if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) { - DistributableDataMapFormat datamapDstr = - new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper, - segmentIds, partitionsToPrune, - BlockletDataMapFactory.class.getName()); - prunedBlocklets = dataMapJob.execute(datamapDstr, resolver); - // Apply expression on the blocklets. - prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets); - } else { - prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune); - } - - List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>(); - int partitionIndex = 0; - List<Integer> partitionIdList = new ArrayList<>(); - if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) { - partitionIdList = partitionInfo.getPartitionIds(); - } - for (ExtendedBlocklet blocklet : prunedBlocklets) { - long partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo( - CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath())); - - // OldPartitionIdList is only used in alter table partition command because it change - // partition info first and then read data. - // For other normal query should use newest partitionIdList - if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) { - if (oldPartitionIdList != null) { - partitionIndex = oldPartitionIdList.indexOf((int)partitionId); - } else { - partitionIndex = partitionIdList.indexOf((int)partitionId); - } - } - if (partitionIndex != -1) { - // matchedPartitions variable will be null in two cases as follows - // 1. the table is not a partition table - // 2. the table is a partition table, and all partitions are matched by query - // for partition table, the task id of carbaondata file name is the partition id. - // if this partition is not required, here will skip it. - if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) { - CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet); - if (inputSplit != null) { - resultFilterredBlocks.add(inputSplit); - } - } - } - } - statistic - .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis()); - recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id")); - return resultFilterredBlocks; - } - - private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException { - CarbonInputSplit split = - CarbonInputSplit.from(blocklet.getSegmentId(), - blocklet.getBlockletId(), new FileSplit(new Path(blocklet.getPath()), 0, - blocklet.getLength(), blocklet.getLocations()), - ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()), - blocklet.getDataMapWriterPath()); - split.setDetailInfo(blocklet.getDetailInfo()); - return split; - } - - @Override - public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, - TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - Configuration configuration = taskAttemptContext.getConfiguration(); - QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext); - CarbonReadSupport<T> readSupport = getReadSupportClass(configuration); - return new CarbonRecordReader<T>(queryModel, readSupport); - } - - public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) - throws IOException { - Configuration configuration = taskAttemptContext.getConfiguration(); - CarbonTable carbonTable = getOrCreateCarbonTable(configuration); - TableProvider tableProvider = new SingleTableProvider(carbonTable); - - // query plan includes projection column - String projectionString = getColumnProjection(configuration); - String[] projectionColumnNames = null; - if (projectionString != null) { - projectionColumnNames = projectionString.split(","); - } - QueryModel queryModel = carbonTable.createQueryWithProjection( - projectionColumnNames, getDataTypeConverter(configuration)); - - // set the filter to the query model in order to filter blocklet before scan - Expression filter = getFilterPredicates(configuration); - boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()]; - // getAllMeasures returns list of visible and invisible columns - boolean[] isFilterMeasures = - new boolean[carbonTable.getAllMeasures().size()]; - CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, isFilterDimensions, - isFilterMeasures); - queryModel.setIsFilterDimensions(isFilterDimensions); - queryModel.setIsFilterMeasures(isFilterMeasures); - FilterResolverIntf filterIntf = CarbonInputFormatUtil - .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider); - queryModel.setFilterExpressionResolverTree(filterIntf); - - // update the file level index store if there are invalid segment - if (inputSplit instanceof CarbonMultiBlockSplit) { - CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit; - List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments(); - if (invalidSegments.size() > 0) { - queryModel.setInvalidSegmentIds(invalidSegments); - } - List<UpdateVO> invalidTimestampRangeList = - split.getAllSplits().get(0).getInvalidTimestampRange(); - if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) { - queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList); - } - } - return queryModel; - } - - private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { - CarbonTable carbonTableTemp; - if (carbonTable == null) { - // carbon table should be created either from deserialized table info (schema saved in - // hive metastore) or by reading schema in HDFS (schema saved in HDFS) - TableInfo tableInfo = getTableInfo(configuration); - CarbonTable localCarbonTable; - if (tableInfo != null) { - localCarbonTable = CarbonTable.buildFromTableInfo(tableInfo); - } else { - String schemaPath = CarbonTablePath - .getSchemaFilePath(getAbsoluteTableIdentifier(configuration).getTablePath()); - if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) { - TableInfo tableInfoInfer = - SchemaReader.inferSchema(getAbsoluteTableIdentifier(configuration)); - localCarbonTable = CarbonTable.buildFromTableInfo(tableInfoInfer); - } else { - localCarbonTable = - SchemaReader.readCarbonTableFromStore(getAbsoluteTableIdentifier(configuration)); - } - } - this.carbonTable = localCarbonTable; - return localCarbonTable; - } else { - carbonTableTemp = this.carbonTable; - return carbonTableTemp; - } - } - - - public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) { - String readSupportClass = configuration.get(CARBON_READ_SUPPORT); - //By default it uses dictionary decoder read class - CarbonReadSupport<T> readSupport = null; - if (readSupportClass != null) { - try { - Class<?> myClass = Class.forName(readSupportClass); - Constructor<?> constructor = myClass.getConstructors()[0]; - Object object = constructor.newInstance(); - if (object instanceof CarbonReadSupport) { - readSupport = (CarbonReadSupport) object; - } - } catch (ClassNotFoundException ex) { - LOG.error("Class " + readSupportClass + "not found", ex); - } catch (Exception ex) { - LOG.error("Error while creating " + readSupportClass, ex); - } - } else { - readSupport = new DictionaryDecodeReadSupport<>(); - } - return readSupport; - } - - @Override - protected boolean isSplitable(JobContext context, Path filename) { - try { - // Don't split the file if it is local file system - FileSystem fileSystem = filename.getFileSystem(context.getConfiguration()); - if (fileSystem instanceof LocalFileSystem) { - return false; - } - } catch (Exception e) { - return true; - } - return true; - } - - /** - * return valid segment to access - */ - private String[] getSegmentsToAccess(JobContext job) { - String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, ""); - if (segmentString.trim().isEmpty()) { - return new String[0]; - } - return segmentString.split(","); - } - - public static DataTypeConverter getDataTypeConverter(Configuration configuration) - throws IOException { - String converter = configuration.get(CARBON_CONVERTER); - if (converter == null) { - return new DataTypeConverterImpl(); - } - return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter); - } - - public static void setDatabaseName(Configuration configuration, String databaseName) { - if (null != databaseName) { - configuration.set(DATABASE_NAME, databaseName); - } - } - - public static String getDatabaseName(Configuration configuration) - throws InvalidConfigurationException { - String databseName = configuration.get(DATABASE_NAME); - if (null == databseName) { - throw new InvalidConfigurationException("Database name is not set."); - } - return databseName; - } - - public static void setTableName(Configuration configuration, String tableName) { - if (null != tableName) { - configuration.set(TABLE_NAME, tableName); - } - } - - public static String getTableName(Configuration configuration) - throws InvalidConfigurationException { - String tableName = configuration.get(TABLE_NAME); - if (tableName == null) { - throw new InvalidConfigurationException("Table name is not set"); - } - return tableName; - } - - public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader( - org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter) - throws IOException { - return null; - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java new file mode 100644 index 0000000..3cc9c5f --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.DataMapChooser; +import org.apache.carbondata.core.datamap.DataMapLevel; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.exception.InvalidConfigurationException; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.metadata.schema.PartitionInfo; +import org.apache.carbondata.core.metadata.schema.partition.PartitionType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.SingleTableProvider; +import org.apache.carbondata.core.scan.filter.TableProvider; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsRecorder; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeConverter; +import org.apache.carbondata.core.util.DataTypeConverterImpl; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonProjection; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.security.TokenCache; + +/** + * Base class for carbondata input format, there are two input format implementations: + * 1. CarbonFileInputFormat: for reading carbondata files without table level metadata support. + * + * 2. CarbonTableInputFormat: for reading carbondata files with table level metadata support, + * such as segment and explicit schema metadata. + * + * @param <T> + */ +public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { + // comma separated list of input segment numbers + public static final String INPUT_SEGMENT_NUMBERS = + "mapreduce.input.carboninputformat.segmentnumbers"; + private static final String VALIDATE_INPUT_SEGMENT_IDs = + "mapreduce.input.carboninputformat.validsegments"; + // comma separated list of input files + private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid"; + private static final Log LOG = LogFactory.getLog(CarbonInputFormat.class); + private static final String FILTER_PREDICATE = + "mapreduce.input.carboninputformat.filter.predicate"; + private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection"; + private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo"; + private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport"; + private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter"; + private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr"; + public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName"; + public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName"; + private static final String PARTITIONS_TO_PRUNE = + "mapreduce.input.carboninputformat.partitions.to.prune"; + + /** + * Set the `tableInfo` in `configuration` + */ + public static void setTableInfo(Configuration configuration, TableInfo tableInfo) + throws IOException { + if (null != tableInfo) { + configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize())); + } + } + + /** + * Get TableInfo object from `configuration` + */ + protected static TableInfo getTableInfo(Configuration configuration) throws IOException { + String tableInfoStr = configuration.get(TABLE_INFO); + if (tableInfoStr == null) { + return null; + } else { + TableInfo output = new TableInfo(); + output.readFields(new DataInputStream( + new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr)))); + return output; + } + } + + /** + * Get the cached CarbonTable or create it by TableInfo in `configuration` + */ + protected abstract CarbonTable getOrCreateCarbonTable(Configuration configuration) + throws IOException; + + public static void setTablePath(Configuration configuration, String tablePath) { + configuration.set(FileInputFormat.INPUT_DIR, tablePath); + } + + public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) { + configuration.set(ALTER_PARTITION_ID, partitionIds.toString()); + } + + public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob) + throws IOException { + if (dataMapJob != null) { + String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob); + configuration.set(DATA_MAP_DSTR, toString); + } + } + + public static DataMapJob getDataMapJob(Configuration configuration) throws IOException { + String jobString = configuration.get(DATA_MAP_DSTR); + if (jobString != null) { + return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString); + } + return null; + } + + /** + * It sets unresolved filter expression. + * + * @param configuration + * @param filterExpression + */ + public static void setFilterPredicates(Configuration configuration, Expression filterExpression) { + if (filterExpression == null) { + return; + } + try { + String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression); + configuration.set(FILTER_PREDICATE, filterString); + } catch (Exception e) { + throw new RuntimeException("Error while setting filter expression to Job", e); + } + } + + public static void setColumnProjection(Configuration configuration, CarbonProjection projection) { + if (projection == null || projection.isEmpty()) { + return; + } + String[] allColumns = projection.getAllColumns(); + StringBuilder builder = new StringBuilder(); + for (String column : allColumns) { + builder.append(column).append(","); + } + String columnString = builder.toString(); + columnString = columnString.substring(0, columnString.length() - 1); + configuration.set(COLUMN_PROJECTION, columnString); + } + + public static String getColumnProjection(Configuration configuration) { + return configuration.get(COLUMN_PROJECTION); + } + + /** + * Set list of segments to access + */ + public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) { + configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments)); + } + + /** + * Set `CARBON_INPUT_SEGMENTS` from property to configuration + */ + public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) { + String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase(); + String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase(); + String segmentNumbersFromProperty = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*"); + if (!segmentNumbersFromProperty.trim().equals("*")) { + CarbonInputFormat + .setSegmentsToAccess(conf, Segment.toSegmentList(segmentNumbersFromProperty.split(","))); + } + } + + /** + * set list of segment to access + */ + public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate) { + configuration.set(CarbonInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString()); + } + + /** + * get list of segment to access + */ + public static boolean getValidateSegmentsToAccess(Configuration configuration) { + return configuration.get(CarbonInputFormat.VALIDATE_INPUT_SEGMENT_IDs, "true") + .equalsIgnoreCase("true"); + } + + /** + * set list of partitions to prune + */ + public static void setPartitionsToPrune(Configuration configuration, + List<PartitionSpec> partitions) { + if (partitions == null) { + return; + } + try { + String partitionString = + ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions)); + configuration.set(PARTITIONS_TO_PRUNE, partitionString); + } catch (Exception e) { + throw new RuntimeException("Error while setting patition information to Job" + partitions, e); + } + } + + /** + * get list of partitions to prune + */ + public static List<PartitionSpec> getPartitionsToPrune(Configuration configuration) + throws IOException { + String partitionString = configuration.get(PARTITIONS_TO_PRUNE); + if (partitionString != null) { + return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString); + } + return null; + } + + public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) + throws IOException { + String tablePath = configuration.get(INPUT_DIR, ""); + try { + return AbsoluteTableIdentifier + .from(tablePath, getDatabaseName(configuration), getTableName(configuration)); + } catch (InvalidConfigurationException e) { + throw new IOException(e); + } + } + + /** + * {@inheritDoc} + * Configurations FileInputFormat.INPUT_DIR + * are used to get table path to read. + * + * @param job + * @return List<InputSplit> list of CarbonInputSplit + * @throws IOException + */ + @Override public abstract List<InputSplit> getSplits(JobContext job) throws IOException; + + protected Expression getFilterPredicates(Configuration configuration) { + try { + String filterExprString = configuration.get(FILTER_PREDICATE); + if (filterExprString == null) { + return null; + } + Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString); + return (Expression) filter; + } catch (IOException e) { + throw new RuntimeException("Error while reading filter expression", e); + } + } + + /** + * get data blocks of given segment + */ + protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job, + AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver, + BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo, + List<Integer> oldPartitionIdList) throws IOException { + + QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder(); + QueryStatistic statistic = new QueryStatistic(); + + // get tokens for all the required FileSystem for table path + TokenCache.obtainTokensForNamenodes(job.getCredentials(), + new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration()); + boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, + CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT)); + DataMapExprWrapper dataMapExprWrapper = + DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), resolver); + DataMapJob dataMapJob = getDataMapJob(job.getConfiguration()); + List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration()); + List<ExtendedBlocklet> prunedBlocklets; + if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) { + DistributableDataMapFormat datamapDstr = + new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper, segmentIds, + partitionsToPrune, BlockletDataMapFactory.class.getName()); + prunedBlocklets = dataMapJob.execute(datamapDstr, resolver); + // Apply expression on the blocklets. + prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets); + } else { + prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune); + } + + List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>(); + int partitionIndex = 0; + List<Integer> partitionIdList = new ArrayList<>(); + if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) { + partitionIdList = partitionInfo.getPartitionIds(); + } + for (ExtendedBlocklet blocklet : prunedBlocklets) { + long partitionId = CarbonTablePath.DataFileUtil + .getTaskIdFromTaskNo(CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath())); + + // OldPartitionIdList is only used in alter table partition command because it change + // partition info first and then read data. + // For other normal query should use newest partitionIdList + if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) { + if (oldPartitionIdList != null) { + partitionIndex = oldPartitionIdList.indexOf((int) partitionId); + } else { + partitionIndex = partitionIdList.indexOf((int) partitionId); + } + } + if (partitionIndex != -1) { + // matchedPartitions variable will be null in two cases as follows + // 1. the table is not a partition table + // 2. the table is a partition table, and all partitions are matched by query + // for partition table, the task id of carbaondata file name is the partition id. + // if this partition is not required, here will skip it. + if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) { + CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet); + if (inputSplit != null) { + resultFilterredBlocks.add(inputSplit); + } + } + } + } + statistic + .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis()); + recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id")); + return resultFilterredBlocks; + } + + private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException { + CarbonInputSplit split = CarbonInputSplit + .from(blocklet.getSegmentId(), blocklet.getBlockletId(), + new FileSplit(new Path(blocklet.getPath()), 0, blocklet.getLength(), + blocklet.getLocations()), + ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()), + blocklet.getDataMapWriterPath()); + split.setDetailInfo(blocklet.getDetailInfo()); + return split; + } + + @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + Configuration configuration = taskAttemptContext.getConfiguration(); + QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext); + CarbonReadSupport<T> readSupport = getReadSupportClass(configuration); + return new CarbonRecordReader<T>(queryModel, readSupport); + } + + public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException { + Configuration configuration = taskAttemptContext.getConfiguration(); + CarbonTable carbonTable = getOrCreateCarbonTable(configuration); + TableProvider tableProvider = new SingleTableProvider(carbonTable); + + // query plan includes projection column + String projectionString = getColumnProjection(configuration); + String[] projectionColumnNames = null; + if (projectionString != null) { + projectionColumnNames = projectionString.split(","); + } + QueryModel queryModel = carbonTable + .createQueryWithProjection(projectionColumnNames, getDataTypeConverter(configuration)); + + // set the filter to the query model in order to filter blocklet before scan + Expression filter = getFilterPredicates(configuration); + boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()]; + // getAllMeasures returns list of visible and invisible columns + boolean[] isFilterMeasures = new boolean[carbonTable.getAllMeasures().size()]; + CarbonInputFormatUtil + .processFilterExpression(filter, carbonTable, isFilterDimensions, isFilterMeasures); + queryModel.setIsFilterDimensions(isFilterDimensions); + queryModel.setIsFilterMeasures(isFilterMeasures); + FilterResolverIntf filterIntf = CarbonInputFormatUtil + .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider); + queryModel.setFilterExpressionResolverTree(filterIntf); + + // update the file level index store if there are invalid segment + if (inputSplit instanceof CarbonMultiBlockSplit) { + CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit; + List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments(); + if (invalidSegments.size() > 0) { + queryModel.setInvalidSegmentIds(invalidSegments); + } + List<UpdateVO> invalidTimestampRangeList = + split.getAllSplits().get(0).getInvalidTimestampRange(); + if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) { + queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList); + } + } + return queryModel; + } + + public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) { + String readSupportClass = configuration.get(CARBON_READ_SUPPORT); + //By default it uses dictionary decoder read class + CarbonReadSupport<T> readSupport = null; + if (readSupportClass != null) { + try { + Class<?> myClass = Class.forName(readSupportClass); + Constructor<?> constructor = myClass.getConstructors()[0]; + Object object = constructor.newInstance(); + if (object instanceof CarbonReadSupport) { + readSupport = (CarbonReadSupport) object; + } + } catch (ClassNotFoundException ex) { + LOG.error("Class " + readSupportClass + "not found", ex); + } catch (Exception ex) { + LOG.error("Error while creating " + readSupportClass, ex); + } + } else { + readSupport = new DictionaryDecodeReadSupport<>(); + } + return readSupport; + } + + @Override protected boolean isSplitable(JobContext context, Path filename) { + try { + // Don't split the file if it is local file system + FileSystem fileSystem = filename.getFileSystem(context.getConfiguration()); + if (fileSystem instanceof LocalFileSystem) { + return false; + } + } catch (Exception e) { + return true; + } + return true; + } + + public static void setCarbonReadSupport(Configuration configuration, + Class<? extends CarbonReadSupport> readSupportClass) { + if (readSupportClass != null) { + configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName()); + } + } + + /** + * It is optional, if user does not set then it reads from store + * + * @param configuration + * @param converter is the Data type converter for different computing engine + * @throws IOException + */ + public static void setDataTypeConverter(Configuration configuration, DataTypeConverter converter) + throws IOException { + if (null != converter) { + configuration.set(CARBON_CONVERTER, + ObjectSerializationUtil.convertObjectToString(converter)); + } + } + + public static DataTypeConverter getDataTypeConverter(Configuration configuration) + throws IOException { + String converter = configuration.get(CARBON_CONVERTER); + if (converter == null) { + return new DataTypeConverterImpl(); + } + return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter); + } + + public static void setDatabaseName(Configuration configuration, String databaseName) { + if (null != databaseName) { + configuration.set(DATABASE_NAME, databaseName); + } + } + + public static String getDatabaseName(Configuration configuration) + throws InvalidConfigurationException { + String databseName = configuration.get(DATABASE_NAME); + if (null == databseName) { + throw new InvalidConfigurationException("Database name is not set."); + } + return databseName; + } + + public static void setTableName(Configuration configuration, String tableName) { + if (null != tableName) { + configuration.set(TABLE_NAME, tableName); + } + } + + public static String getTableName(Configuration configuration) + throws InvalidConfigurationException { + String tableName = configuration.get(TABLE_NAME); + if (tableName == null) { + throw new InvalidConfigurationException("Table name is not set"); + } + return tableName; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 11121e9..605ea48 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -17,11 +17,8 @@ package org.apache.carbondata.hadoop.api; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; import java.io.File; import java.io.IOException; -import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; @@ -31,21 +28,14 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datamap.DataMapChooser; -import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.TableDataMap; -import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.exception.InvalidConfigurationException; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; -import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.schema.PartitionInfo; import org.apache.carbondata.core.metadata.schema.partition.PartitionType; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -60,29 +50,15 @@ import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; import org.apache.carbondata.core.scan.filter.SingleTableProvider; import org.apache.carbondata.core.scan.filter.TableProvider; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; -import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.stats.QueryStatistic; -import org.apache.carbondata.core.stats.QueryStatisticsConstants; -import org.apache.carbondata.core.stats.QueryStatisticsRecorder; import org.apache.carbondata.core.statusmanager.FileFormat; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.DataTypeConverter; -import org.apache.carbondata.core.util.DataTypeConverterImpl; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.format.BlockIndex; import org.apache.carbondata.hadoop.CarbonInputSplit; -import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; -import org.apache.carbondata.hadoop.CarbonProjection; -import org.apache.carbondata.hadoop.CarbonRecordReader; -import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; -import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; -import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; import org.apache.carbondata.hadoop.util.SchemaReader; import org.apache.commons.logging.Log; @@ -91,80 +67,38 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.hadoop.mapreduce.security.TokenCache; /** - * Input format of CarbonData file. + * InputFormat for reading carbondata files with table level metadata support, + * such as segment and explicit schema metadata. * * @param <T> */ -public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { +public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { // comma separated list of input segment numbers public static final String INPUT_SEGMENT_NUMBERS = "mapreduce.input.carboninputformat.segmentnumbers"; - private static final String VALIDATE_INPUT_SEGMENT_IDs = - "mapreduce.input.carboninputformat.validsegments"; // comma separated list of input files public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files"; private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid"; private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class); - private static final String FILTER_PREDICATE = - "mapreduce.input.carboninputformat.filter.predicate"; - private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection"; - private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo"; private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport"; private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter"; - private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr"; public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName"; public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName"; - private static final String PARTITIONS_TO_PRUNE = - "mapreduce.input.carboninputformat.partitions.to.prune"; - public static final String UPADTE_T = - "mapreduce.input.carboninputformat.partitions.to.prune"; - // a cache for carbon table, it will be used in task side private CarbonTable carbonTable; /** - * Set the `tableInfo` in `configuration` - */ - public static void setTableInfo(Configuration configuration, TableInfo tableInfo) - throws IOException { - if (null != tableInfo) { - configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize())); - } - } - - /** - * Get TableInfo object from `configuration` - */ - private static TableInfo getTableInfo(Configuration configuration) throws IOException { - String tableInfoStr = configuration.get(TABLE_INFO); - if (tableInfoStr == null) { - return null; - } else { - TableInfo output = new TableInfo(); - output.readFields( - new DataInputStream( - new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr)))); - return output; - } - } - - /** * Get the cached CarbonTable or create it by TableInfo in `configuration` */ - private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + protected CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { if (carbonTable == null) { // carbon table should be created either from deserialized table info (schema saved in // hive metastore) or by reading schema in HDFS (schema saved in HDFS) @@ -183,150 +117,6 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { } } - public static void setTablePath(Configuration configuration, String tablePath) { - configuration.set(FileInputFormat.INPUT_DIR, tablePath); - } - - public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) { - configuration.set(ALTER_PARTITION_ID, partitionIds.toString()); - } - - - public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob) - throws IOException { - if (dataMapJob != null) { - String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob); - configuration.set(DATA_MAP_DSTR, toString); - } - } - - private static DataMapJob getDataMapJob(Configuration configuration) throws IOException { - String jobString = configuration.get(DATA_MAP_DSTR); - if (jobString != null) { - return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString); - } - return null; - } - - /** - * It sets unresolved filter expression. - * - * @param configuration - * @param filterExpression - */ - public static void setFilterPredicates(Configuration configuration, Expression filterExpression) { - if (filterExpression == null) { - return; - } - try { - String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression); - configuration.set(FILTER_PREDICATE, filterString); - } catch (Exception e) { - throw new RuntimeException("Error while setting filter expression to Job", e); - } - } - - public static void setColumnProjection(Configuration configuration, CarbonProjection projection) { - if (projection == null || projection.isEmpty()) { - return; - } - String[] allColumns = projection.getAllColumns(); - StringBuilder builder = new StringBuilder(); - for (String column : allColumns) { - builder.append(column).append(","); - } - String columnString = builder.toString(); - columnString = columnString.substring(0, columnString.length() - 1); - configuration.set(COLUMN_PROJECTION, columnString); - } - - public static String getColumnProjection(Configuration configuration) { - return configuration.get(COLUMN_PROJECTION); - } - - public static void setCarbonReadSupport(Configuration configuration, - Class<? extends CarbonReadSupport> readSupportClass) { - if (readSupportClass != null) { - configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName()); - } - } - - /** - * Set list of segments to access - */ - public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) { - configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments)); - } - - /** - * Set `CARBON_INPUT_SEGMENTS` from property to configuration - */ - public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) { - String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase(); - String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase(); - String segmentNumbersFromProperty = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*"); - if (!segmentNumbersFromProperty.trim().equals("*")) { - CarbonTableInputFormat - .setSegmentsToAccess(conf, Segment.toSegmentList(segmentNumbersFromProperty.split(","))); - } - } - - /** - * set list of segment to access - */ - public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate) { - configuration.set(CarbonTableInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString()); - } - - /** - * get list of segment to access - */ - public static boolean getValidateSegmentsToAccess(Configuration configuration) { - return configuration.get(CarbonTableInputFormat.VALIDATE_INPUT_SEGMENT_IDs, "true") - .equalsIgnoreCase("true"); - } - - /** - * set list of partitions to prune - */ - public static void setPartitionsToPrune(Configuration configuration, - List<PartitionSpec> partitions) { - if (partitions == null) { - return; - } - try { - String partitionString = - ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions)); - configuration.set(PARTITIONS_TO_PRUNE, partitionString); - } catch (Exception e) { - throw new RuntimeException("Error while setting patition information to Job" + partitions, e); - } - } - - /** - * get list of partitions to prune - */ - public static List<PartitionSpec> getPartitionsToPrune(Configuration configuration) - throws IOException { - String partitionString = configuration.get(PARTITIONS_TO_PRUNE); - if (partitionString != null) { - return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString); - } - return null; - } - - public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) - throws IOException { - String tablePath = configuration.get(INPUT_DIR, ""); - try { - return AbsoluteTableIdentifier - .from(tablePath, getDatabaseName(configuration), getTableName(configuration)); - } catch (InvalidConfigurationException e) { - throw new IOException(e); - } - } - /** * {@inheritDoc} * Configurations FileInputFormat.INPUT_DIR @@ -363,8 +153,6 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { return getSplitsOfStreaming(job, identifier, streamSegments); } - - List<Segment> filteredSegmentToAccess = getFilteredSegment(job, segments.getValidSegments()); if (filteredSegmentToAccess.size() == 0) { return getSplitsOfStreaming(job, identifier, streamSegments); @@ -717,195 +505,6 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { return result; } - protected Expression getFilterPredicates(Configuration configuration) { - try { - String filterExprString = configuration.get(FILTER_PREDICATE); - if (filterExprString == null) { - return null; - } - Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString); - return (Expression) filter; - } catch (IOException e) { - throw new RuntimeException("Error while reading filter expression", e); - } - } - - /** - * get data blocks of given segment - */ - private List<org.apache.carbondata.hadoop.CarbonInputSplit> getDataBlocksOfSegment(JobContext job, - AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver, - BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo, - List<Integer> oldPartitionIdList) throws IOException { - - QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder(); - QueryStatistic statistic = new QueryStatistic(); - - // get tokens for all the required FileSystem for table path - TokenCache.obtainTokensForNamenodes(job.getCredentials(), - new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration()); - boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, - CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT)); - DataMapExprWrapper dataMapExprWrapper = - DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), resolver); - DataMapJob dataMapJob = getDataMapJob(job.getConfiguration()); - List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration()); - List<ExtendedBlocklet> prunedBlocklets; - if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) { - DistributableDataMapFormat datamapDstr = - new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper, - segmentIds, partitionsToPrune, - BlockletDataMapFactory.class.getName()); - prunedBlocklets = dataMapJob.execute(datamapDstr, resolver); - // Apply expression on the blocklets. - prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets); - } else { - prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune); - } - - List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>(); - int partitionIndex = 0; - List<Integer> partitionIdList = new ArrayList<>(); - if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) { - partitionIdList = partitionInfo.getPartitionIds(); - } - for (ExtendedBlocklet blocklet : prunedBlocklets) { - long partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo( - CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath())); - - // OldPartitionIdList is only used in alter table partition command because it change - // partition info first and then read data. - // For other normal query should use newest partitionIdList - if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) { - if (oldPartitionIdList != null) { - partitionIndex = oldPartitionIdList.indexOf((int)partitionId); - } else { - partitionIndex = partitionIdList.indexOf((int)partitionId); - } - } - if (partitionIndex != -1) { - // matchedPartitions variable will be null in two cases as follows - // 1. the table is not a partition table - // 2. the table is a partition table, and all partitions are matched by query - // for partition table, the task id of carbaondata file name is the partition id. - // if this partition is not required, here will skip it. - if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) { - CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet); - if (inputSplit != null) { - resultFilterredBlocks.add(inputSplit); - } - } - } - } - statistic - .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis()); - recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id")); - return resultFilterredBlocks; - } - - private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException { - org.apache.carbondata.hadoop.CarbonInputSplit split = - org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(), - blocklet.getBlockletId(), new FileSplit(new Path(blocklet.getPath()), 0, - blocklet.getLength(), blocklet.getLocations()), - ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()), - blocklet.getDataMapWriterPath()); - split.setDetailInfo(blocklet.getDetailInfo()); - return split; - } - - @Override - public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, - TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - Configuration configuration = taskAttemptContext.getConfiguration(); - QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext); - CarbonReadSupport<T> readSupport = getReadSupportClass(configuration); - return new CarbonRecordReader<T>(queryModel, readSupport); - } - - public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) - throws IOException { - Configuration configuration = taskAttemptContext.getConfiguration(); - CarbonTable carbonTable = getOrCreateCarbonTable(configuration); - TableProvider tableProvider = new SingleTableProvider(carbonTable); - - // query plan includes projection column - String projectionString = getColumnProjection(configuration); - String[] projectionColumnNames = null; - if (projectionString != null) { - projectionColumnNames = projectionString.split(","); - } - QueryModel queryModel = carbonTable.createQueryWithProjection( - projectionColumnNames, getDataTypeConverter(configuration)); - - // set the filter to the query model in order to filter blocklet before scan - Expression filter = getFilterPredicates(configuration); - boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()]; - // getAllMeasures returns list of visible and invisible columns - boolean[] isFilterMeasures = - new boolean[carbonTable.getAllMeasures().size()]; - CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, isFilterDimensions, - isFilterMeasures); - queryModel.setIsFilterDimensions(isFilterDimensions); - queryModel.setIsFilterMeasures(isFilterMeasures); - FilterResolverIntf filterIntf = CarbonInputFormatUtil - .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider); - queryModel.setFilterExpressionResolverTree(filterIntf); - - // update the file level index store if there are invalid segment - if (inputSplit instanceof CarbonMultiBlockSplit) { - CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit; - List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments(); - if (invalidSegments.size() > 0) { - queryModel.setInvalidSegmentIds(invalidSegments); - } - List<UpdateVO> invalidTimestampRangeList = - split.getAllSplits().get(0).getInvalidTimestampRange(); - if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) { - queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList); - } - } - return queryModel; - } - - public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) { - String readSupportClass = configuration.get(CARBON_READ_SUPPORT); - //By default it uses dictionary decoder read class - CarbonReadSupport<T> readSupport = null; - if (readSupportClass != null) { - try { - Class<?> myClass = Class.forName(readSupportClass); - Constructor<?> constructor = myClass.getConstructors()[0]; - Object object = constructor.newInstance(); - if (object instanceof CarbonReadSupport) { - readSupport = (CarbonReadSupport) object; - } - } catch (ClassNotFoundException ex) { - LOG.error("Class " + readSupportClass + "not found", ex); - } catch (Exception ex) { - LOG.error("Error while creating " + readSupportClass, ex); - } - } else { - readSupport = new DictionaryDecodeReadSupport<>(); - } - return readSupport; - } - - @Override - protected boolean isSplitable(JobContext context, Path filename) { - try { - // Don't split the file if it is local file system - FileSystem fileSystem = filename.getFileSystem(context.getConfiguration()); - if (fileSystem instanceof LocalFileSystem) { - return false; - } - } catch (Exception e) { - return true; - } - return true; - } - /** * return valid segment to access */ @@ -970,58 +569,4 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping); } - - /** - * It is optional, if user does not set then it reads from store - * - * @param configuration - * @param converter is the Data type converter for different computing engine - * @throws IOException - */ - public static void setDataTypeConverter(Configuration configuration, DataTypeConverter converter) - throws IOException { - if (null != converter) { - configuration.set(CARBON_CONVERTER, - ObjectSerializationUtil.convertObjectToString(converter)); - } - } - - public static DataTypeConverter getDataTypeConverter(Configuration configuration) - throws IOException { - String converter = configuration.get(CARBON_CONVERTER); - if (converter == null) { - return new DataTypeConverterImpl(); - } - return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter); - } - - public static void setDatabaseName(Configuration configuration, String databaseName) { - if (null != databaseName) { - configuration.set(DATABASE_NAME, databaseName); - } - } - - public static String getDatabaseName(Configuration configuration) - throws InvalidConfigurationException { - String databseName = configuration.get(DATABASE_NAME); - if (null == databseName) { - throw new InvalidConfigurationException("Database name is not set."); - } - return databseName; - } - - public static void setTableName(Configuration configuration, String tableName) { - if (null != tableName) { - configuration.set(TABLE_NAME, tableName); - } - } - - public static String getTableName(Configuration configuration) - throws InvalidConfigurationException { - String tableName = configuration.get(TABLE_NAME); - if (tableName == null) { - throw new InvalidConfigurationException("Table name is not set"); - } - return tableName; - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java index ab7c333..9df59e6 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java @@ -87,11 +87,11 @@ public class SchemaReader { // Convert the ColumnSchema -> TableSchema -> TableInfo. // Return the TableInfo. org.apache.carbondata.format.TableInfo tableInfo = - CarbonUtil.inferSchemaFileExternalTable(identifier.getTablePath(), identifier, false); + CarbonUtil.inferSchema(identifier.getTablePath(), identifier, false); SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); - TableInfo wrapperTableInfo = schemaConverter - .fromExternalToWrapperTableInfo(tableInfo, identifier.getDatabaseName(), - identifier.getTableName(), identifier.getTablePath()); + TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( + tableInfo, identifier.getDatabaseName(), identifier.getTableName(), + identifier.getTablePath()); return wrapperTableInfo; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c09ef998/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala index 8b1f63f..7841a23 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala @@ -105,14 +105,14 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be } //TO DO, need to remove segment dependency and tableIdentifier Dependency - test("read carbondata files (sdk Writer Output) using the Carbonfile ") { + test("read carbondata files (sdk Writer Output) using the carbonfile ") { buildTestData(false) assert(new File(writerPath).exists()) sql("DROP TABLE IF EXISTS sdkOutputTable") - //new provider Carbonfile + //new provider carbonfile sql( - s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbonfile' LOCATION |'$writerPath' """.stripMargin) sql("Describe formatted sdkOutputTable").show(false) @@ -152,7 +152,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be //data source file format sql( - s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbonfile' LOCATION |'$writerPath' """.stripMargin) val exception = intercept[MalformedCarbonCommandException] @@ -176,7 +176,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be //data source file format sql( - s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbonfile' LOCATION |'$writerPath' """.stripMargin) //org.apache.spark.SparkException: Index file not present to read the carbondata file @@ -192,7 +192,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be cleanTestData() } - + // TODO: Make the sparkCarbonFileFormat to work without index file test("Read sdk writer output file without Carbondata file should fail") { buildTestData(false) deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) @@ -202,7 +202,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be val exception = intercept[Exception] { // data source file format sql( - s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbonfile' LOCATION |'$writerPath' """.stripMargin) } assert(exception.getMessage() @@ -225,7 +225,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be val exception = intercept[Exception] { //data source file format sql( - s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbonfile' LOCATION |'$writerPath' """.stripMargin) sql("select * from sdkOutputTable").show(false)
