http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java new file mode 100644 index 0000000..b2c2d39 --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java @@ -0,0 +1,495 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hadoop.testutil; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.Charset; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.DataMapStoreManager; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.CarbonMetadata; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.metadata.ColumnIdentifier; +import org.apache.carbondata.core.metadata.converter.SchemaConverter; +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.SchemaEvolution; +import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.metadata.schema.table.TableSchema; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.core.writer.CarbonDictionaryWriter; +import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl; +import org.apache.carbondata.core.writer.ThriftWriter; +import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter; +import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl; +import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfo; +import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfoPreparator; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants; +import org.apache.carbondata.processing.loading.csvinput.BlockDetails; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; +import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.TableOptionConstant; + +import com.google.gson.Gson; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +/** + * This class will create store file based on provided schema + * + */ +public class StoreCreator { + + private static LogService LOG = + LogServiceFactory.getLogService(StoreCreator.class.getCanonicalName()); + private static AbsoluteTableIdentifier absoluteTableIdentifier; + private static String storePath = null; + + static { + storePath = new File("target/store").getAbsolutePath(); + String dbName = "testdb"; + String tableName = "testtable"; + absoluteTableIdentifier = AbsoluteTableIdentifier.from(storePath + "/testdb/testtable", + new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString())); + } + + public static AbsoluteTableIdentifier getAbsoluteTableIdentifier() { + return absoluteTableIdentifier; + } + + public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String factFilePath, + AbsoluteTableIdentifier absoluteTableIdentifier) { + CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table); + CarbonLoadModel loadModel = new CarbonLoadModel(); + loadModel.setCarbonDataLoadSchema(schema); + loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName()); + loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); + loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); + loadModel.setFactFilePath(factFilePath); + loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>()); + loadModel.setTablePath(absoluteTableIdentifier.getTablePath()); + loadModel.setDateFormat(null); + loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS)); + loadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)); + loadModel + .setSerializationNullFormat( + TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N"); + loadModel + .setBadRecordsLoggerEnable( + TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + "false"); + loadModel + .setBadRecordsAction( + TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "FORCE"); + loadModel + .setIsEmptyDataBadRecord( + DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + "false"); + loadModel.setCsvHeader("ID,date,country,name,phonetype,serialname,salary"); + loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(",")); + loadModel.setTaskNo("0"); + loadModel.setSegmentId("0"); + loadModel.setFactTimeStamp(System.currentTimeMillis()); + loadModel.setMaxColumns("10"); + return loadModel; + } + + /** + * Create store without any restructure + */ + public static void createCarbonStore() throws Exception { + CarbonLoadModel loadModel = createTableAndLoadModel(); + loadData(loadModel, storePath); + } + + /** + * Method to clear the data maps + */ + public static void clearDataMaps() { + DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier); + } + + public static CarbonLoadModel createTableAndLoadModel() throws Exception { + String factFilePath = + new File("../hadoop/src/test/resources/data.csv").getCanonicalPath(); + File storeDir = new File(storePath); + CarbonUtil.deleteFoldersAndFiles(storeDir); + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, + storePath); + + CarbonTable table = createTable(absoluteTableIdentifier); + writeDictionary(factFilePath, table); + return buildCarbonLoadModel(table, factFilePath, absoluteTableIdentifier); + } + + public static CarbonTable createTable( + AbsoluteTableIdentifier identifier) throws IOException { + TableInfo tableInfo = new TableInfo(); + tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName()); + TableSchema tableSchema = new TableSchema(); + tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName()); + List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>(); + ArrayList<Encoding> encodings = new ArrayList<>(); + encodings.add(Encoding.DICTIONARY); + ColumnSchema id = new ColumnSchema(); + id.setColumnName("ID"); + id.setColumnar(true); + id.setDataType(DataTypes.INT); + id.setEncodingList(encodings); + id.setColumnUniqueId(UUID.randomUUID().toString()); + id.setColumnReferenceId(id.getColumnUniqueId()); + id.setDimensionColumn(true); + id.setColumnGroup(1); + columnSchemas.add(id); + + ColumnSchema date = new ColumnSchema(); + date.setColumnName("date"); + date.setColumnar(true); + date.setDataType(DataTypes.STRING); + date.setEncodingList(encodings); + date.setColumnUniqueId(UUID.randomUUID().toString()); + date.setDimensionColumn(true); + date.setColumnGroup(2); + date.setSortColumn(true); + date.setColumnReferenceId(id.getColumnUniqueId()); + columnSchemas.add(date); + + ColumnSchema country = new ColumnSchema(); + country.setColumnName("country"); + country.setColumnar(true); + country.setDataType(DataTypes.STRING); + country.setEncodingList(encodings); + country.setColumnUniqueId(UUID.randomUUID().toString()); + country.setDimensionColumn(true); + country.setColumnGroup(3); + country.setSortColumn(true); + country.setColumnReferenceId(id.getColumnUniqueId()); + columnSchemas.add(country); + + ColumnSchema name = new ColumnSchema(); + name.setColumnName("name"); + name.setColumnar(true); + name.setDataType(DataTypes.STRING); + name.setEncodingList(encodings); + name.setColumnUniqueId(UUID.randomUUID().toString()); + name.setDimensionColumn(true); + name.setColumnGroup(4); + name.setSortColumn(true); + name.setColumnReferenceId(id.getColumnUniqueId()); + columnSchemas.add(name); + + ColumnSchema phonetype = new ColumnSchema(); + phonetype.setColumnName("phonetype"); + phonetype.setColumnar(true); + phonetype.setDataType(DataTypes.STRING); + phonetype.setEncodingList(encodings); + phonetype.setColumnUniqueId(UUID.randomUUID().toString()); + phonetype.setDimensionColumn(true); + phonetype.setColumnGroup(5); + phonetype.setSortColumn(true); + phonetype.setColumnReferenceId(id.getColumnUniqueId()); + columnSchemas.add(phonetype); + + ColumnSchema serialname = new ColumnSchema(); + serialname.setColumnName("serialname"); + serialname.setColumnar(true); + serialname.setDataType(DataTypes.STRING); + serialname.setEncodingList(encodings); + serialname.setColumnUniqueId(UUID.randomUUID().toString()); + serialname.setDimensionColumn(true); + serialname.setColumnGroup(6); + serialname.setSortColumn(true); + serialname.setColumnReferenceId(id.getColumnUniqueId()); + columnSchemas.add(serialname); + + ColumnSchema salary = new ColumnSchema(); + salary.setColumnName("salary"); + salary.setColumnar(true); + salary.setDataType(DataTypes.INT); + salary.setEncodingList(new ArrayList<Encoding>()); + salary.setColumnUniqueId(UUID.randomUUID().toString()); + salary.setDimensionColumn(false); + salary.setColumnReferenceId(id.getColumnUniqueId()); + salary.setColumnGroup(7); + columnSchemas.add(salary); + + tableSchema.setListOfColumns(columnSchemas); + SchemaEvolution schemaEvol = new SchemaEvolution(); + schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>()); + tableSchema.setSchemaEvalution(schemaEvol); + tableSchema.setTableId(UUID.randomUUID().toString()); + tableInfo.setTableUniqueName( + identifier.getCarbonTableIdentifier().getTableUniqueName() + ); + tableInfo.setLastUpdatedTime(System.currentTimeMillis()); + tableInfo.setFactTable(tableSchema); + tableInfo.setTablePath(identifier.getTablePath()); + String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath()); + String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath); + CarbonMetadata.getInstance().loadTableMetadata(tableInfo); + + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + org.apache.carbondata.format.TableInfo thriftTableInfo = + schemaConverter.fromWrapperToExternalTableInfo( + tableInfo, + tableInfo.getDatabaseName(), + tableInfo.getFactTable().getTableName()); + org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry = + new org.apache.carbondata.format.SchemaEvolutionEntry(tableInfo.getLastUpdatedTime()); + thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history() + .add(schemaEvolutionEntry); + + FileFactory.FileType fileType = FileFactory.getFileType(schemaMetadataPath); + if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) { + FileFactory.mkdirs(schemaMetadataPath, fileType); + } + + ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false); + thriftWriter.open(); + thriftWriter.write(thriftTableInfo); + thriftWriter.close(); + return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName()); + } + + private static void writeDictionary(String factFilePath, CarbonTable table) throws Exception { + BufferedReader reader = new BufferedReader(new InputStreamReader( + new FileInputStream(factFilePath), "UTF-8")); + List<CarbonDimension> dims = table.getDimensionByTableName(table.getTableName()); + Set<String>[] set = new HashSet[dims.size()]; + for (int i = 0; i < set.length; i++) { + set[i] = new HashSet<String>(); + } + String line = reader.readLine(); + while (line != null) { + String[] data = line.split(","); + for (int i = 0; i < set.length; i++) { + set[i].add(data[i]); + } + line = reader.readLine(); + } + + Cache dictCache = CacheProvider.getInstance() + .createCache(CacheType.REVERSE_DICTIONARY); + for (int i = 0; i < set.length; i++) { + ColumnIdentifier columnIdentifier = + new ColumnIdentifier(dims.get(i).getColumnId(), null, null); + DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier = + new DictionaryColumnUniqueIdentifier( + table.getAbsoluteTableIdentifier(), columnIdentifier, columnIdentifier.getDataType()); + CarbonDictionaryWriter writer = + new CarbonDictionaryWriterImpl(dictionaryColumnUniqueIdentifier); + for (String value : set[i]) { + writer.write(value); + } + writer.close(); + writer.commit(); + Dictionary dict = (Dictionary) dictCache.get( + new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, + columnIdentifier, dims.get(i).getDataType())); + CarbonDictionarySortInfoPreparator preparator = + new CarbonDictionarySortInfoPreparator(); + List<String> newDistinctValues = new ArrayList<String>(); + CarbonDictionarySortInfo dictionarySortInfo = + preparator.getDictionarySortInfo(newDistinctValues, dict, dims.get(i).getDataType()); + CarbonDictionarySortIndexWriter carbonDictionaryWriter = + new CarbonDictionarySortIndexWriterImpl(dictionaryColumnUniqueIdentifier); + try { + carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex()); + carbonDictionaryWriter.writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted()); + } finally { + carbonDictionaryWriter.close(); + } + } + reader.close(); + } + + /** + * Execute graph which will further load data + * + * @param loadModel + * @param storeLocation + * @throws Exception + */ + public static void loadData(CarbonLoadModel loadModel, String storeLocation) + throws Exception { + if (new File(storeLocation).mkdirs()) { + LOG.warn("mkdir is failed"); + } + String outPutLoc = storeLocation + "/etl"; + String databaseName = loadModel.getDatabaseName(); + String tableName = loadModel.getTableName(); + String tempLocationKey = databaseName + '_' + tableName + "_1"; + CarbonProperties.getInstance().addProperty( + tempLocationKey, storeLocation + "/" + databaseName + "/" + tableName); + CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc); + CarbonProperties.getInstance().addProperty("send.signal.load", "false"); + CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true"); + CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1"); + CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true"); + CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true"); + CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true"); + CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false"); + CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000"); + + String graphPath = + outPutLoc + File.separator + loadModel.getDatabaseName() + File.separator + tableName + + File.separator + 0 + File.separator + 1 + File.separator + tableName + ".ktr"; + File path = new File(graphPath); + if (path.exists()) { + if (!path.delete()) { + LOG.warn("delete " + path + " failed"); + } + } + + BlockDetails blockDetails = new BlockDetails(new Path(loadModel.getFactFilePath()), + 0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"}); + Configuration configuration = new Configuration(); + CSVInputFormat.setCommentCharacter(configuration, loadModel.getCommentChar()); + CSVInputFormat.setCSVDelimiter(configuration, loadModel.getCsvDelimiter()); + CSVInputFormat.setEscapeCharacter(configuration, loadModel.getEscapeChar()); + CSVInputFormat.setHeaderExtractionEnabled(configuration, true); + CSVInputFormat.setQuoteCharacter(configuration, loadModel.getQuoteChar()); + CSVInputFormat.setReadBufferSize(configuration, + CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CSV_READ_BUFFER_SIZE, + CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT)); + CSVInputFormat.setNumberOfColumns( + configuration, String.valueOf(loadModel.getCsvHeaderColumns().length)); + CSVInputFormat.setMaxColumns(configuration, "10"); + + TaskAttemptContextImpl hadoopAttemptContext = + new TaskAttemptContextImpl(configuration, new TaskAttemptID("", 1, TaskType.MAP, 0, 0)); + CSVInputFormat format = new CSVInputFormat(); + + RecordReader<NullWritable, StringArrayWritable> recordReader = + format.createRecordReader(blockDetails, hadoopAttemptContext); + + CSVRecordReaderIterator readerIterator = + new CSVRecordReaderIterator(recordReader, blockDetails, hadoopAttemptContext); + new DataLoadExecutor().execute(loadModel, + new String[] {storeLocation + "/" + databaseName + "/" + tableName}, + new CarbonIterator[]{readerIterator}); + + writeLoadMetadata( + loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(), + new ArrayList<LoadMetadataDetails>()); + } + + public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName, + String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException { + LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails(); + loadMetadataDetails.setLoadEndTime(System.currentTimeMillis()); + loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS); + loadMetadataDetails.setLoadName(String.valueOf(0)); + loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime())); + listOfLoadFolderDetails.add(loadMetadataDetails); + + String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + File.separator + + CarbonTablePath.TABLE_STATUS_FILE; + + DataOutputStream dataOutputStream; + Gson gsonObjectToWrite = new Gson(); + BufferedWriter brWriter = null; + + AtomicFileOperations writeOperation = + new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation)); + + try { + + dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + + String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray()); + brWriter.write(metadataInstance); + } finally { + try { + if (null != brWriter) { + brWriter.flush(); + } + } catch (Exception e) { + throw e; + + } + CarbonUtil.closeStreams(brWriter); + + } + writeOperation.close(); + + } + + public static String readCurrentTime() { + SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS); + String date = null; + + date = sdf.format(new Date()); + + return date; + } + + public static void main(String[] args) throws Exception { + StoreCreator.createCarbonStore(); + } + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java deleted file mode 100644 index 395015e..0000000 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.hadoop.util; - -import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; -import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.encoder.Encoding; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; - -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.DecimalType; -import org.apache.spark.sql.types.StructField; - -public class CarbonTypeUtil { - - public static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType( - DataType carbonDataType) { - if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) { - return DataTypes.StringType; - } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) { - return DataTypes.ShortType; - } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) { - return DataTypes.IntegerType; - } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) { - return DataTypes.LongType; - } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) { - return DataTypes.DoubleType; - } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) { - return DataTypes.BooleanType; - } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(carbonDataType)) { - return DataTypes.createDecimalType(); - } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) { - return DataTypes.TimestampType; - } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) { - return DataTypes.DateType; - } else { - return null; - } - } - - public static StructField[] convertCarbonSchemaToSparkSchema(CarbonColumn[] carbonColumns) { - StructField[] fields = new StructField[carbonColumns.length]; - for (int i = 0; i < carbonColumns.length; i++) { - CarbonColumn carbonColumn = carbonColumns[i]; - if (carbonColumn.isDimension()) { - if (carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)) { - DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory - .getDirectDictionaryGenerator(carbonColumn.getDataType()); - fields[i] = new StructField(carbonColumn.getColName(), - CarbonTypeUtil.convertCarbonToSparkDataType(generator.getReturnType()), true, null); - } else if (!carbonColumn.hasEncoding(Encoding.DICTIONARY)) { - fields[i] = new StructField(carbonColumn.getColName(), - CarbonTypeUtil.convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null); - } else if (carbonColumn.isComplex()) { - fields[i] = new StructField(carbonColumn.getColName(), - CarbonTypeUtil.convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null); - } else { - fields[i] = new StructField(carbonColumn.getColName(), CarbonTypeUtil - .convertCarbonToSparkDataType( - org.apache.carbondata.core.metadata.datatype.DataTypes.INT), true, null); - } - } else if (carbonColumn.isMeasure()) { - DataType dataType = carbonColumn.getDataType(); - if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN - || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT - || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT - || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) { - fields[i] = new StructField(carbonColumn.getColName(), - CarbonTypeUtil.convertCarbonToSparkDataType(dataType), true, null); - } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) { - CarbonMeasure measure = (CarbonMeasure) carbonColumn; - fields[i] = new StructField(carbonColumn.getColName(), - new DecimalType(measure.getPrecision(), measure.getScale()), true, null); - } else { - fields[i] = new StructField(carbonColumn.getColName(), CarbonTypeUtil - .convertCarbonToSparkDataType( - org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE), true, null); - } - } - } - return fields; - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java index 2f029ab..ea242d1 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java @@ -27,8 +27,6 @@ import java.io.IOException; import java.util.List; import java.util.UUID; -import junit.framework.TestCase; - import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataTypes; @@ -41,7 +39,7 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.hadoop.CarbonProjection; import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; -import org.apache.carbondata.hadoop.test.util.StoreCreator; +import org.apache.carbondata.hadoop.testutil.StoreCreator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -52,9 +50,7 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; public class CarbonTableInputFormatTest { http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java index 653a49e..99f69c2 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java @@ -26,7 +26,7 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; -import org.apache.carbondata.hadoop.test.util.StoreCreator; +import org.apache.carbondata.hadoop.testutil.StoreCreator; import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java deleted file mode 100644 index 57f488f..0000000 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.hadoop.streaming; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.UUID; - -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; -import org.apache.carbondata.core.statusmanager.FileFormat; -import org.apache.carbondata.hadoop.CarbonInputSplit; -import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; - -import junit.framework.TestCase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import org.junit.Assert; -import org.junit.Test; - -public class CarbonStreamInputFormatTest extends TestCase { - - private TaskAttemptID taskAttemptId; - private TaskAttemptContext taskAttemptContext; - private Configuration hadoopConf; - private AbsoluteTableIdentifier identifier; - private String tablePath; - - - @Override protected void setUp() throws Exception { - tablePath = new File("target/stream_input").getCanonicalPath(); - String dbName = "default"; - String tableName = "stream_table_input"; - identifier = AbsoluteTableIdentifier.from( - tablePath, - new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString())); - - JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0); - TaskID taskId = new TaskID(jobId, TaskType.MAP, 0); - taskAttemptId = new TaskAttemptID(taskId, 0); - - hadoopConf = new Configuration(); - taskAttemptContext = new TaskAttemptContextImpl(hadoopConf, taskAttemptId); - } - - private InputSplit buildInputSplit() throws IOException { - CarbonInputSplit carbonInputSplit = new CarbonInputSplit(); - List<CarbonInputSplit> splitList = new ArrayList<>(); - splitList.add(carbonInputSplit); - return new CarbonMultiBlockSplit(splitList, new String[] { "localhost" }, - FileFormat.ROW_V1); - } - - @Test public void testCreateRecordReader() { - try { - InputSplit inputSplit = buildInputSplit(); - CarbonStreamInputFormat inputFormat = new CarbonStreamInputFormat(); - RecordReader recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext); - Assert.assertNotNull("Failed to create record reader", recordReader); - } catch (Exception e) { - e.printStackTrace(); - Assert.assertTrue(e.getMessage(), false); - } - } - - @Override protected void tearDown() throws Exception { - super.tearDown(); - if (tablePath != null) { - FileFactory.deleteAllFilesOfDir(new File(tablePath)); - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java deleted file mode 100644 index e871c7e..0000000 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.hadoop.streaming; - -import java.io.File; -import java.io.IOException; -import java.util.Date; -import java.util.UUID; - -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.hadoop.test.util.StoreCreator; -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; -import org.apache.carbondata.processing.loading.model.CarbonLoadModel; - -import junit.framework.TestCase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import org.junit.Assert; -import org.junit.Test; - -public class CarbonStreamOutputFormatTest extends TestCase { - - private Configuration hadoopConf; - private TaskAttemptID taskAttemptId; - private CarbonLoadModel carbonLoadModel; - private String tablePath; - - @Override protected void setUp() throws Exception { - super.setUp(); - JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0); - TaskID taskId = new TaskID(jobId, TaskType.MAP, 0); - taskAttemptId = new TaskAttemptID(taskId, 0); - - hadoopConf = new Configuration(); - hadoopConf.set("mapred.job.id", jobId.toString()); - hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID().toString()); - hadoopConf.set("mapred.task.id", taskAttemptId.toString()); - hadoopConf.setBoolean("mapred.task.is.map", true); - hadoopConf.setInt("mapred.task.partition", 0); - - tablePath = new File("target/stream_output").getCanonicalPath(); - String dbName = "default"; - String tableName = "stream_table_output"; - AbsoluteTableIdentifier identifier = - AbsoluteTableIdentifier.from( - tablePath, - new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString())); - - CarbonTable table = StoreCreator.createTable(identifier); - - String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath(); - carbonLoadModel = StoreCreator.buildCarbonLoadModel(table, factFilePath, identifier); - } - - @Test public void testSetCarbonLoadModel() { - try { - CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel); - } catch (IOException e) { - Assert.assertTrue("Failed to config CarbonLoadModel for CarbonStreamOutputFromat", false); - } - } - - @Test public void testGetCarbonLoadModel() { - try { - CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel); - CarbonLoadModel model = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf); - - Assert.assertNotNull("Failed to get CarbonLoadModel", model); - Assert.assertTrue("CarbonLoadModel should be same with previous", - carbonLoadModel.getFactTimeStamp() == model.getFactTimeStamp()); - - } catch (IOException e) { - Assert.assertTrue("Failed to get CarbonLoadModel for CarbonStreamOutputFromat", false); - } - } - - @Test public void testGetRecordWriter() { - CarbonStreamOutputFormat outputFormat = new CarbonStreamOutputFormat(); - try { - CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel); - TaskAttemptContext taskAttemptContext = - new TaskAttemptContextImpl(hadoopConf, taskAttemptId); - RecordWriter recordWriter = outputFormat.getRecordWriter(taskAttemptContext); - Assert.assertNotNull("Failed to get CarbonStreamRecordWriter", recordWriter); - } catch (Exception e) { - e.printStackTrace(); - Assert.assertTrue(e.getMessage(), false); - } - } - - @Override protected void tearDown() throws Exception { - super.tearDown(); - if (tablePath != null) { - FileFactory.deleteAllFilesOfDir(new File(tablePath)); - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java deleted file mode 100644 index 8e8916d..0000000 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java +++ /dev/null @@ -1,492 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.hadoop.test.util; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.nio.charset.Charset; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.core.cache.Cache; -import org.apache.carbondata.core.cache.CacheProvider; -import org.apache.carbondata.core.cache.CacheType; -import org.apache.carbondata.core.cache.dictionary.Dictionary; -import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datamap.DataMapStoreManager; -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.fileoperations.AtomicFileOperations; -import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; -import org.apache.carbondata.core.fileoperations.FileWriteOperation; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonMetadata; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; -import org.apache.carbondata.core.metadata.ColumnIdentifier; -import org.apache.carbondata.core.metadata.converter.SchemaConverter; -import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.metadata.encoder.Encoding; -import org.apache.carbondata.core.metadata.schema.SchemaEvolution; -import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.metadata.schema.table.TableSchema; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; -import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; -import org.apache.carbondata.core.statusmanager.SegmentStatus; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.core.writer.CarbonDictionaryWriter; -import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl; -import org.apache.carbondata.core.writer.ThriftWriter; -import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter; -import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl; -import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfo; -import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfoPreparator; -import org.apache.carbondata.processing.loading.DataLoadExecutor; -import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants; -import org.apache.carbondata.processing.loading.csvinput.BlockDetails; -import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; -import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator; -import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; -import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; -import org.apache.carbondata.processing.loading.model.CarbonLoadModel; -import org.apache.carbondata.processing.util.TableOptionConstant; - -import com.google.gson.Gson; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.TaskAttemptID; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; - -/** - * This class will create store file based on provided schema - * - */ -public class StoreCreator { - - private static AbsoluteTableIdentifier absoluteTableIdentifier; - private static String storePath = null; - - static { - try { - storePath = new File("target/store").getCanonicalPath(); - String dbName = "testdb"; - String tableName = "testtable"; - absoluteTableIdentifier = - AbsoluteTableIdentifier.from( - storePath +"/testdb/testtable", - new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString())); - } catch (IOException ex) { - - } - } - - public static AbsoluteTableIdentifier getAbsoluteTableIdentifier() { - return absoluteTableIdentifier; - } - - public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String factFilePath, - AbsoluteTableIdentifier absoluteTableIdentifier) { - CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table); - CarbonLoadModel loadModel = new CarbonLoadModel(); - loadModel.setCarbonDataLoadSchema(schema); - loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName()); - loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); - loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); - loadModel.setFactFilePath(factFilePath); - loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>()); - loadModel.setTablePath(absoluteTableIdentifier.getTablePath()); - loadModel.setDateFormat(null); - loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, - CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS)); - loadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_DATE_FORMAT, - CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)); - loadModel - .setSerializationNullFormat( - TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N"); - loadModel - .setBadRecordsLoggerEnable( - TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + "false"); - loadModel - .setBadRecordsAction( - TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "FORCE"); - loadModel - .setIsEmptyDataBadRecord( - DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + "false"); - loadModel.setCsvHeader("ID,date,country,name,phonetype,serialname,salary"); - loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(",")); - loadModel.setTaskNo("0"); - loadModel.setSegmentId("0"); - loadModel.setFactTimeStamp(System.currentTimeMillis()); - loadModel.setMaxColumns("10"); - return loadModel; - } - - /** - * Create store without any restructure - */ - public static void createCarbonStore() throws Exception { - CarbonLoadModel loadModel = createTableAndLoadModel(); - loadData(loadModel, storePath); - } - - /** - * Method to clear the data maps - */ - public static void clearDataMaps() { - DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier); - } - - public static CarbonLoadModel createTableAndLoadModel() throws Exception { - String factFilePath = - new File("../hadoop/src/test/resources/data.csv").getCanonicalPath(); - File storeDir = new File(storePath); - CarbonUtil.deleteFoldersAndFiles(storeDir); - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, - storePath); - - CarbonTable table = createTable(absoluteTableIdentifier); - writeDictionary(factFilePath, table); - return buildCarbonLoadModel(table, factFilePath, absoluteTableIdentifier); - } - - public static CarbonTable createTable( - AbsoluteTableIdentifier identifier) throws IOException { - TableInfo tableInfo = new TableInfo(); - tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName()); - TableSchema tableSchema = new TableSchema(); - tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName()); - List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>(); - ArrayList<Encoding> encodings = new ArrayList<>(); - encodings.add(Encoding.DICTIONARY); - ColumnSchema id = new ColumnSchema(); - id.setColumnName("ID"); - id.setColumnar(true); - id.setDataType(DataTypes.INT); - id.setEncodingList(encodings); - id.setColumnUniqueId(UUID.randomUUID().toString()); - id.setColumnReferenceId(id.getColumnUniqueId()); - id.setDimensionColumn(true); - id.setColumnGroup(1); - columnSchemas.add(id); - - ColumnSchema date = new ColumnSchema(); - date.setColumnName("date"); - date.setColumnar(true); - date.setDataType(DataTypes.STRING); - date.setEncodingList(encodings); - date.setColumnUniqueId(UUID.randomUUID().toString()); - date.setDimensionColumn(true); - date.setColumnGroup(2); - date.setSortColumn(true); - date.setColumnReferenceId(id.getColumnUniqueId()); - columnSchemas.add(date); - - ColumnSchema country = new ColumnSchema(); - country.setColumnName("country"); - country.setColumnar(true); - country.setDataType(DataTypes.STRING); - country.setEncodingList(encodings); - country.setColumnUniqueId(UUID.randomUUID().toString()); - country.setDimensionColumn(true); - country.setColumnGroup(3); - country.setSortColumn(true); - country.setColumnReferenceId(id.getColumnUniqueId()); - columnSchemas.add(country); - - ColumnSchema name = new ColumnSchema(); - name.setColumnName("name"); - name.setColumnar(true); - name.setDataType(DataTypes.STRING); - name.setEncodingList(encodings); - name.setColumnUniqueId(UUID.randomUUID().toString()); - name.setDimensionColumn(true); - name.setColumnGroup(4); - name.setSortColumn(true); - name.setColumnReferenceId(id.getColumnUniqueId()); - columnSchemas.add(name); - - ColumnSchema phonetype = new ColumnSchema(); - phonetype.setColumnName("phonetype"); - phonetype.setColumnar(true); - phonetype.setDataType(DataTypes.STRING); - phonetype.setEncodingList(encodings); - phonetype.setColumnUniqueId(UUID.randomUUID().toString()); - phonetype.setDimensionColumn(true); - phonetype.setColumnGroup(5); - phonetype.setSortColumn(true); - phonetype.setColumnReferenceId(id.getColumnUniqueId()); - columnSchemas.add(phonetype); - - ColumnSchema serialname = new ColumnSchema(); - serialname.setColumnName("serialname"); - serialname.setColumnar(true); - serialname.setDataType(DataTypes.STRING); - serialname.setEncodingList(encodings); - serialname.setColumnUniqueId(UUID.randomUUID().toString()); - serialname.setDimensionColumn(true); - serialname.setColumnGroup(6); - serialname.setSortColumn(true); - serialname.setColumnReferenceId(id.getColumnUniqueId()); - columnSchemas.add(serialname); - - ColumnSchema salary = new ColumnSchema(); - salary.setColumnName("salary"); - salary.setColumnar(true); - salary.setDataType(DataTypes.INT); - salary.setEncodingList(new ArrayList<Encoding>()); - salary.setColumnUniqueId(UUID.randomUUID().toString()); - salary.setDimensionColumn(false); - salary.setColumnReferenceId(id.getColumnUniqueId()); - salary.setColumnGroup(7); - columnSchemas.add(salary); - - tableSchema.setListOfColumns(columnSchemas); - SchemaEvolution schemaEvol = new SchemaEvolution(); - schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>()); - tableSchema.setSchemaEvalution(schemaEvol); - tableSchema.setTableId(UUID.randomUUID().toString()); - tableInfo.setTableUniqueName( - identifier.getCarbonTableIdentifier().getTableUniqueName() - ); - tableInfo.setLastUpdatedTime(System.currentTimeMillis()); - tableInfo.setFactTable(tableSchema); - tableInfo.setTablePath(identifier.getTablePath()); - String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath()); - String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath); - CarbonMetadata.getInstance().loadTableMetadata(tableInfo); - - SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); - org.apache.carbondata.format.TableInfo thriftTableInfo = - schemaConverter.fromWrapperToExternalTableInfo( - tableInfo, - tableInfo.getDatabaseName(), - tableInfo.getFactTable().getTableName()); - org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry = - new org.apache.carbondata.format.SchemaEvolutionEntry(tableInfo.getLastUpdatedTime()); - thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history() - .add(schemaEvolutionEntry); - - FileFactory.FileType fileType = FileFactory.getFileType(schemaMetadataPath); - if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) { - FileFactory.mkdirs(schemaMetadataPath, fileType); - } - - ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false); - thriftWriter.open(); - thriftWriter.write(thriftTableInfo); - thriftWriter.close(); - return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName()); - } - - private static void writeDictionary(String factFilePath, CarbonTable table) throws Exception { - BufferedReader reader = new BufferedReader(new FileReader(factFilePath)); - String header = reader.readLine(); - String[] split = header.split(","); - List<CarbonColumn> allCols = new ArrayList<CarbonColumn>(); - List<CarbonDimension> dims = table.getDimensionByTableName(table.getTableName()); - allCols.addAll(dims); - List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getTableName()); - allCols.addAll(msrs); - Set<String>[] set = new HashSet[dims.size()]; - for (int i = 0; i < set.length; i++) { - set[i] = new HashSet<String>(); - } - String line = reader.readLine(); - while (line != null) { - String[] data = line.split(","); - for (int i = 0; i < set.length; i++) { - set[i].add(data[i]); - } - line = reader.readLine(); - } - - Cache dictCache = CacheProvider.getInstance() - .createCache(CacheType.REVERSE_DICTIONARY); - for (int i = 0; i < set.length; i++) { - ColumnIdentifier columnIdentifier = new ColumnIdentifier(dims.get(i).getColumnId(), null, null); - DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier = - new DictionaryColumnUniqueIdentifier(table.getAbsoluteTableIdentifier(), columnIdentifier, - columnIdentifier.getDataType()); - CarbonDictionaryWriter writer = - new CarbonDictionaryWriterImpl(dictionaryColumnUniqueIdentifier); - for (String value : set[i]) { - writer.write(value); - } - writer.close(); - writer.commit(); - Dictionary dict = (Dictionary) dictCache.get( - new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, - columnIdentifier, dims.get(i).getDataType())); - CarbonDictionarySortInfoPreparator preparator = - new CarbonDictionarySortInfoPreparator(); - List<String> newDistinctValues = new ArrayList<String>(); - CarbonDictionarySortInfo dictionarySortInfo = - preparator.getDictionarySortInfo(newDistinctValues, dict, dims.get(i).getDataType()); - CarbonDictionarySortIndexWriter carbonDictionaryWriter = - new CarbonDictionarySortIndexWriterImpl(dictionaryColumnUniqueIdentifier); - try { - carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex()); - carbonDictionaryWriter.writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted()); - } finally { - carbonDictionaryWriter.close(); - } - } - reader.close(); - } - - /** - * Execute graph which will further load data - * - * @param loadModel - * @param storeLocation - * @throws Exception - */ - public static void loadData(CarbonLoadModel loadModel, String storeLocation) - throws Exception { - new File(storeLocation).mkdirs(); - String outPutLoc = storeLocation + "/etl"; - String databaseName = loadModel.getDatabaseName(); - String tableName = loadModel.getTableName(); - String tempLocationKey = databaseName + '_' + tableName + "_1"; - CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation + "/" + databaseName + "/" + tableName); - CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc); - CarbonProperties.getInstance().addProperty("send.signal.load", "false"); - CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true"); - CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1"); - CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true"); - CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true"); - CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true"); - CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false"); - CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000"); - - String graphPath = - outPutLoc + File.separator + loadModel.getDatabaseName() + File.separator + tableName - + File.separator + 0 + File.separator + 1 + File.separator + tableName + ".ktr"; - File path = new File(graphPath); - if (path.exists()) { - path.delete(); - } - - BlockDetails blockDetails = new BlockDetails(new Path(loadModel.getFactFilePath()), - 0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"}); - Configuration configuration = new Configuration(); - CSVInputFormat.setCommentCharacter(configuration, loadModel.getCommentChar()); - CSVInputFormat.setCSVDelimiter(configuration, loadModel.getCsvDelimiter()); - CSVInputFormat.setEscapeCharacter(configuration, loadModel.getEscapeChar()); - CSVInputFormat.setHeaderExtractionEnabled(configuration, true); - CSVInputFormat.setQuoteCharacter(configuration, loadModel.getQuoteChar()); - CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE, - CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT)); - CSVInputFormat.setNumberOfColumns(configuration, String.valueOf(loadModel.getCsvHeaderColumns().length)); - CSVInputFormat.setMaxColumns(configuration, "10"); - - TaskAttemptContextImpl hadoopAttemptContext = new TaskAttemptContextImpl(configuration, new TaskAttemptID("", 1, TaskType.MAP, 0, 0)); - CSVInputFormat format = new CSVInputFormat(); - - RecordReader<NullWritable, StringArrayWritable> recordReader = - format.createRecordReader(blockDetails, hadoopAttemptContext); - - CSVRecordReaderIterator readerIterator = new CSVRecordReaderIterator(recordReader, blockDetails, hadoopAttemptContext); - new DataLoadExecutor().execute(loadModel, - new String[] {storeLocation + "/" + databaseName + "/" + tableName}, - new CarbonIterator[]{readerIterator}); - - writeLoadMetadata(loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(), - new ArrayList<LoadMetadataDetails>()); - } - - public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName, - String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException { - LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails(); - loadMetadataDetails.setLoadEndTime(System.currentTimeMillis()); - loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS); - loadMetadataDetails.setLoadName(String.valueOf(0)); - loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime())); - listOfLoadFolderDetails.add(loadMetadataDetails); - - String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + File.separator - + CarbonTablePath.TABLE_STATUS_FILE; - - DataOutputStream dataOutputStream; - Gson gsonObjectToWrite = new Gson(); - BufferedWriter brWriter = null; - - AtomicFileOperations writeOperation = - new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation)); - - try { - - dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE); - brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, - Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); - - String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray()); - brWriter.write(metadataInstance); - } finally { - try { - if (null != brWriter) { - brWriter.flush(); - } - } catch (Exception e) { - throw e; - - } - CarbonUtil.closeStreams(brWriter); - - } - writeOperation.close(); - - } - - public static String readCurrentTime() { - SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS); - String date = null; - - date = sdf.format(new Date()); - - return date; - } - - public static void main(String[] args) throws Exception { - StoreCreator.createCarbonStore(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml index 16f327d..f011a75 100644 --- a/integration/spark-common/pom.xml +++ b/integration/spark-common/pom.xml @@ -36,7 +36,7 @@ <dependencies> <dependency> <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-hadoop</artifactId> + <artifactId>carbondata-streaming</artifactId> <version>${project.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java index 6e9e0a6..f6dc65b 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java @@ -20,10 +20,19 @@ package org.apache.carbondata.spark.util; import java.io.Serializable; import java.math.BigDecimal; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.util.DataTypeConverter; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.catalyst.util.GenericArrayData; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.StructField; import org.apache.spark.unsafe.types.UTF8String; /** @@ -90,4 +99,76 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri public Object wrapWithGenericRow(Object[] fields) { return new GenericInternalRow(fields); } + + private static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType( + DataType carbonDataType) { + if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) { + return DataTypes.StringType; + } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) { + return DataTypes.ShortType; + } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) { + return DataTypes.IntegerType; + } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) { + return DataTypes.LongType; + } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) { + return DataTypes.DoubleType; + } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) { + return DataTypes.BooleanType; + } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(carbonDataType)) { + return DataTypes.createDecimalType(); + } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) { + return DataTypes.TimestampType; + } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) { + return DataTypes.DateType; + } else { + return null; + } + } + + /** + * convert from CarbonColumn array to Spark's StructField array + */ + @Override + public Object[] convertCarbonSchemaToSparkSchema(CarbonColumn[] carbonColumns) { + StructField[] fields = new StructField[carbonColumns.length]; + for (int i = 0; i < carbonColumns.length; i++) { + CarbonColumn carbonColumn = carbonColumns[i]; + if (carbonColumn.isDimension()) { + if (carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)) { + DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(carbonColumn.getDataType()); + fields[i] = new StructField(carbonColumn.getColName(), + convertCarbonToSparkDataType(generator.getReturnType()), true, null); + } else if (!carbonColumn.hasEncoding(Encoding.DICTIONARY)) { + fields[i] = new StructField(carbonColumn.getColName(), + convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null); + } else if (carbonColumn.isComplex()) { + fields[i] = new StructField(carbonColumn.getColName(), + convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null); + } else { + fields[i] = new StructField(carbonColumn.getColName(), + convertCarbonToSparkDataType( + org.apache.carbondata.core.metadata.datatype.DataTypes.INT), true, null); + } + } else if (carbonColumn.isMeasure()) { + DataType dataType = carbonColumn.getDataType(); + if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN + || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT + || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT + || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) { + fields[i] = new StructField(carbonColumn.getColName(), + convertCarbonToSparkDataType(dataType), true, null); + } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) { + CarbonMeasure measure = (CarbonMeasure) carbonColumn; + fields[i] = new StructField(carbonColumn.getColName(), + new DecimalType(measure.getPrecision(), measure.getScale()), true, null); + } else { + fields[i] = new StructField(carbonColumn.getColName(), + convertCarbonToSparkDataType( + org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE), true, null); + } + } + } + return fields; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 2be0efc..7e549a6 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -53,10 +53,11 @@ import org.apache.carbondata.core.statusmanager.FileFormat import org.apache.carbondata.core.util._ import org.apache.carbondata.hadoop._ import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, CarbonTableInputFormat} -import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader} +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.InitInputMetrics import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util} +import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader} /** * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
