Repository: incubator-carbondata Updated Branches: refs/heads/master a8ed450bf -> 4397d0599
changed max columns from static value to configurable Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/0a09472a Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/0a09472a Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/0a09472a Branch: refs/heads/master Commit: 0a09472a4e710caa3960188b7ab0a405abdc9abc Parents: a8ed450 Author: kunal642 <kunal.kap...@knoldus.in> Authored: Sat Apr 15 13:48:17 2017 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Mon Apr 17 13:32:06 2017 +0530 ---------------------------------------------------------------------- .../hadoop/test/util/StoreCreator.java | 3 ++ .../TestDataLoadWithColumnsMoreThanSchema.scala | 36 ++++++------- .../dataload/TestLoadDataWithHiveSyntax.scala | 2 +- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 3 ++ .../carbondata/spark/util/CommonUtil.scala | 57 +++++++++++++++++++- .../execution/command/carbonTableSchema.scala | 4 +- .../dataload/SparkDatasourceSuite.scala | 1 - .../util/ExternalColumnDictionaryTestCase.scala | 1 + ...GlobalDictionaryUtilConcurrentTestCase.scala | 12 +++-- .../util/GlobalDictionaryUtilTestCase.scala | 1 + .../execution/command/carbonTableSchema.scala | 5 +- .../processing/csvload/CSVInputFormat.java | 25 ++++++++- .../processing/model/CarbonLoadModel.java | 1 + .../newflow/CarbonDataLoadConfiguration.java | 12 +++++ .../carbondata/processing/StoreCreator.java | 3 ++ .../processing/csvload/CSVInputFormatTest.java | 2 + 16 files changed, 136 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java index 51ce2c5..2997e94 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java @@ -166,6 +166,7 @@ public class StoreCreator { loadModel.setSegmentId("0"); loadModel.setPartitionId("0"); loadModel.setFactTimeStamp(System.currentTimeMillis()); + loadModel.setMaxColumns("10"); executeGraph(loadModel, absoluteTableIdentifier.getStorePath()); @@ -399,6 +400,8 @@ public class StoreCreator { CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE, CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT)); + CSVInputFormat.setNumberOfColumns(configuration, String.valueOf(loadModel.getCsvHeaderColumns().length)); + CSVInputFormat.setMaxColumns(configuration, "10"); TaskAttemptContextImpl hadoopAttemptContext = new TaskAttemptContextImpl(configuration, new TaskAttemptID("", 1, TaskType.MAP, 0, 0)); CSVInputFormat format = new CSVInputFormat(); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala index 9711051..c25e520 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala @@ -84,46 +84,42 @@ class TestDataLoadWithColumnsMoreThanSchema extends QueryTest with BeforeAndAfte } test("test for maxcolumns option value greater than threshold value for maxcolumns") { - sql("DROP TABLE IF EXISTS valid_max_columns_test") - sql("CREATE TABLE valid_max_columns_test (imei string,age int,task bigint,num double,level decimal(10,3),productdate timestamp,mark int,name string)STORED BY 'org.apache.carbondata.format'") - try { + intercept[Exception] { + sql("DROP TABLE IF EXISTS valid_max_columns_test") + sql( + "CREATE TABLE valid_max_columns_test (imei string,age int,task bigint,num double,level decimal(10,3),productdate timestamp,mark int,name string)STORED BY 'org.apache.carbondata.format'") sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/character_carbon.csv' into table valid_max_columns_test options('MAXCOLUMNS'='22000')") - checkAnswer(sql("select count(*) from valid_max_columns_test"), - sql("select count(*) from hive_char_test")) - } catch { - case _: Throwable => assert(false) } } test("test for boundary value for maxcolumns") { - sql("DROP TABLE IF EXISTS boundary_max_columns_test") - sql("CREATE TABLE boundary_max_columns_test (empno string, empname String, designation String, doj String, " + + intercept[Exception] { + sql("DROP TABLE IF EXISTS boundary_max_columns_test") + sql( + "CREATE TABLE boundary_max_columns_test (empno string, empname String, designation " + + "String, doj String, " + "workgroupcategory string, workgroupcategoryname String, deptno string, deptname String, " + "projectcode string, projectjoindate String, projectenddate String,attendance double," + "utilization double,salary double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES" + "('DICTIONARY_EXCLUDE'='empno,empname,designation,doj,workgroupcategory," + "workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate')") - try { - sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' into table boundary_max_columns_test options('MAXCOLUMNS'='14')") - assert(true) - } catch { - case _: Throwable => assert(false) + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' into table boundary_max_columns_test" + + s" options('MAXCOLUMNS'='14')") + } } test("test for maxcolumns value less than columns in 1st line of csv file") { - sql("DROP TABLE IF EXISTS boundary_max_columns_test") - sql("CREATE TABLE boundary_max_columns_test (empno string, empname String, designation String, doj String, " + + intercept[Exception] { + sql("DROP TABLE IF EXISTS boundary_max_columns_test") + sql( + "CREATE TABLE boundary_max_columns_test (empno string, empname String, designation String, doj String, " + "workgroupcategory string, workgroupcategoryname String, deptno string, deptname String, " + "projectcode string, projectjoindate String, projectenddate String,attendance double," + "utilization double,salary double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES" + "('DICTIONARY_EXCLUDE'='empno,empname,designation,doj,workgroupcategory," + "workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate')") - try { sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' into table boundary_max_columns_test options('MAXCOLUMNS'='13')") - assert(true) - } catch { - case _: Throwable => assert(false) } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala index 353db9e..561b0d1 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala @@ -651,7 +651,7 @@ class TestLoadDataWithHiveSyntax extends QueryTest with BeforeAndAfterAll { ) sql( s"LOAD DATA local inpath '$resourcesPath/comment.csv' INTO TABLE comment_test " + - "options('DELIMITER' = ',', 'QUOTECHAR' = '.', 'COMMENTCHAR' = '?','FILEHEADER'='imei,age,task,num,level,productdate,mark,name')" + "options('DELIMITER' = ',', 'QUOTECHAR' = '.', 'COMMENTCHAR' = '?','FILEHEADER'='imei,age,task,num,level,productdate,mark,name', 'maxcolumns'='180')" ) checkAnswer(sql("select imei from comment_test"),Seq(Row("\".carbon"),Row("#?carbon"), Row(""), Row("~carbon,"))) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 350a2ec..49984c9 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -64,6 +64,7 @@ class CarbonMergerRDD[K, V]( sc.setLocalProperty("spark.scheduler.pool", "DDL") sc.setLocalProperty("spark.job.interruptOnCancel", "true") + private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "") var storeLocation: String = null var mergeResult: String = null val hdfsStoreLocation = carbonMergerMapping.hdfsStoreLocation @@ -260,6 +261,8 @@ class CarbonMergerRDD[K, V]( val jobConf: JobConf = new JobConf(new Configuration) val job: Job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) + // initialise query_id for job + job.getConfiguration.set("query.id", queryId) var defaultParallelism = sparkContext.defaultParallelism val result = new java.util.ArrayList[Partition](defaultParallelism) var partitionNo = 0 http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 7592e4e..679a4e7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -273,6 +273,9 @@ object CommonUtil { CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar) CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter) CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar) + CSVInputFormat.setMaxColumns(configuration, carbonLoadModel.getMaxColumns) + CSVInputFormat.setNumberOfColumns(configuration, carbonLoadModel.getCsvHeaderColumns.length + .toString) CSVInputFormat.setHeaderExtractionEnabled(configuration, carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty) CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar) @@ -342,7 +345,59 @@ object CommonUtil { + "the same. Input file : " + csvFile) } } - csvColumns } + + def validateMaxColumns(csvHeaders: Array[String], maxColumns: String): Int = { + /* + User configures both csvheadercolumns, maxcolumns, + if csvheadercolumns >= maxcolumns, give error + if maxcolumns > threashold, give error + User configures csvheadercolumns + if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1 + if csvheadercolumns >= threashold, give error + User configures nothing + if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1 + if csvheadercolumns >= threashold, give error + */ + val columnCountInSchema = csvHeaders.length + var maxNumberOfColumnsForParsing = 0 + val maxColumnsInt = getMaxColumnValue(maxColumns) + if (maxColumnsInt != null) { + if (columnCountInSchema >= maxColumnsInt) { + sys.error(s"csv headers should be less than the max columns: $maxColumnsInt") + } else if (maxColumnsInt > CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) { + sys.error(s"max columns cannot be greater than the threshold value: ${ + CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING + }") + } else { + maxNumberOfColumnsForParsing = maxColumnsInt + } + } else if (columnCountInSchema >= CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) { + sys.error(s"csv header columns should be less than max threashold: ${ + CSVInputFormat + .THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING + }") + } else if (columnCountInSchema >= CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) { + maxNumberOfColumnsForParsing = columnCountInSchema + 1 + } else { + maxNumberOfColumnsForParsing = CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING + } + maxNumberOfColumnsForParsing + } + + private def getMaxColumnValue(maxColumn: String): Integer = { + if (maxColumn != null) { + try { + maxColumn.toInt + } catch { + case e: Exception => + LOGGER.error(s"Invalid value for max column in load options ${ e.getMessage }") + null + } + } else { + null + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 5a22e9c..15472e5 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -404,7 +404,6 @@ case class LoadTable( validateDateFormat(dateFormat, table) val maxColumns = options.getOrElse("maxcolumns", null) - carbonLoadModel.setMaxColumns(checkDefaultValue(maxColumns, null)) carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\")) carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\"")) carbonLoadModel.setCommentChar(checkDefaultValue(commentchar, "#")) @@ -474,6 +473,9 @@ case class LoadTable( carbonLoadModel.setColDictFilePath(columnDict) carbonLoadModel.setDirectLoad(true) carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) + val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns, + maxColumns) + carbonLoadModel.setMaxColumns(validatedMaxColumns.toString) GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata if (carbonLoadModel.getUseOnePass) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala index 4c5b241..0b64759 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala @@ -17,7 +17,6 @@ package org.apache.carbondata.integration.spark.testsuite.dataload -import java.io.File import org.apache.spark.sql.common.util.QueryTest import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala index 5a986b7..05b94ee 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala @@ -148,6 +148,7 @@ class ExternalColumnDictionaryTestCase extends QueryTest with BeforeAndAfterAll CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) + carbonLoadModel.setMaxColumns("100") carbonLoadModel } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala index 377bbaa..9e0f851 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala @@ -16,7 +16,6 @@ */ package org.apache.carbondata.spark.util -import java.io.File import java.util.concurrent.{Callable, Executors} import scala.collection.mutable.ListBuffer @@ -64,6 +63,7 @@ class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAft CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) + carbonLoadModel.setMaxColumns("2000") carbonLoadModel } @@ -88,6 +88,7 @@ class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAft "employee")(sqlContext) .asInstanceOf[CarbonRelation] } + def writedummydata(filePath: String, recCount: Int) = { var a: Int = 0 var records: StringBuilder = StringBuilder.newBuilder @@ -98,6 +99,7 @@ class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAft dis.writeBytes(records.toString()) dis.close() } + test("concurrent dictionary generation") { CarbonProperties.getInstance.addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME, "-1") val noOfFiles = 5 @@ -114,8 +116,8 @@ class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAft for (i <- 0 until noOfFiles) { dictGenerators.add(new DictGenerator(loadModels(i))) } - val executorService = Executors.newFixedThreadPool(10); - val results = executorService.invokeAll(dictGenerators); + val executorService = Executors.newFixedThreadPool(10) + val results = executorService.invokeAll(dictGenerators) for (i <- 0 until noOfFiles) { val res = results.get(i).get assert("Pass".equals(res)) @@ -128,7 +130,7 @@ class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAft val carbonTableIdentifier = sampleRelation.tableMeta.carbonTable.getCarbonTableIdentifier val columnIdentifier = sampleRelation.tableMeta.carbonTable.getDimensionByName("employee", "empid").getColumnIdentifier val carbonTablePath = PathFactory.getInstance() - .getCarbonTablePath(sampleRelation.tableMeta.storePath, carbonTableIdentifier); + .getCarbonTablePath(sampleRelation.tableMeta.storePath, carbonTableIdentifier) val dictPath = carbonTablePath.getDictionaryFilePath(columnIdentifier.getColumnId) val dictFile = FileFactory.getCarbonFile(dictPath, FileFactory.getFileType(dictPath)) val offSet = dictFile.getSize @@ -146,11 +148,13 @@ class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAft file.delete() } } + override def afterAll { sql("drop table if exists employee") CarbonProperties.getInstance.addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME, Integer.toString(CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME)) } + class DictGenerator(loadModel: CarbonLoadModel) extends Callable[String] { override def call:String = { var result = "Pass" http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala index 189e694..c4b213f 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala @@ -70,6 +70,7 @@ class GlobalDictionaryUtilTestCase extends QueryTest with BeforeAndAfterAll { CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) + carbonLoadModel.setMaxColumns("2000") carbonLoadModel } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 1451247..c8e0436 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -405,7 +405,6 @@ case class LoadTable( val dateFormat = options.getOrElse("dateformat", null) validateDateFormat(dateFormat, table) val maxColumns = options.getOrElse("maxcolumns", null) - carbonLoadModel.setMaxColumns(checkDefaultValue(maxColumns, null)) carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\")) carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\"")) carbonLoadModel.setCommentChar(checkDefaultValue(commentchar, "#")) @@ -472,8 +471,10 @@ case class LoadTable( carbonLoadModel.setColDictFilePath(columnDict) carbonLoadModel.setDirectLoad(true) carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) + val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns, + maxColumns) + carbonLoadModel.setMaxColumns(validatedMaxColumns.toString) GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata - if (carbonLoadModel.getUseOnePass) { val colDictFilePath = carbonLoadModel.getColDictFilePath if (colDictFilePath != null) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java index 1f7d403..e252e7f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java +++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java @@ -21,6 +21,9 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; + import com.univocity.parsers.csv.CsvParser; import com.univocity.parsers.csv.CsvParserSettings; import org.apache.hadoop.conf.Configuration; @@ -63,6 +66,14 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri public static final boolean HEADER_PRESENT_DEFAULT = false; public static final String READ_BUFFER_SIZE = "carbon.csvinputformat.read.buffer.size"; public static final String READ_BUFFER_SIZE_DEFAULT = "65536"; + public static final String MAX_COLUMNS = "carbon.csvinputformat.max.columns"; + public static final String NUMBER_OF_COLUMNS = "carbon.csvinputformat.number.of.columns"; + public static final int DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING = 2000; + public static final int THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING = 20000; + + private static LogService LOGGER = + LogServiceFactory.getLogService(CSVInputFormat.class.toString()); + @Override public RecordReader<NullWritable, StringArrayWritable> createRecordReader(InputSplit inputSplit, @@ -145,6 +156,16 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri } } + public static void setMaxColumns(Configuration configuration, String maxColumns) { + if (maxColumns != null) { + configuration.set(MAX_COLUMNS, maxColumns); + } + } + + public static void setNumberOfColumns(Configuration configuration, String numberOfColumns) { + configuration.set(NUMBER_OF_COLUMNS, numberOfColumns); + } + /** * Treats value as line in file. Key is null. */ @@ -220,8 +241,8 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri parserSettings.setIgnoreTrailingWhitespaces(false); parserSettings.setSkipEmptyLines(false); parserSettings.setMaxCharsPerColumn(100000); - // TODO get from csv file. - parserSettings.setMaxColumns(1000); + String maxColumns = job.get(MAX_COLUMNS); + parserSettings.setMaxColumns(Integer.parseInt(maxColumns)); parserSettings.getFormat().setQuote(job.get(QUOTE, QUOTE_DEFAULT).charAt(0)); parserSettings.getFormat().setQuoteEscape(job.get(ESCAPE, ESCAPE_DEFAULT).charAt(0)); if (start == 0) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java index 525874f..d8f84bf 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java @@ -171,6 +171,7 @@ public class CarbonLoadModel implements Serializable { */ private boolean preFetch; + private String numberOfcolumns; /** * get escape char * http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java index 0bd3e45..12be777 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java @@ -46,6 +46,10 @@ public class CarbonDataLoadConfiguration { private BucketingInfo bucketingInfo; + private String numberOfColumns; + + private String maxColumns; + private Map<String, Object> dataLoadProperties = new HashMap<>(); /** @@ -185,6 +189,14 @@ public class CarbonDataLoadConfiguration { this.taskNo = taskNo; } + public void setMaxColumns(String maxColumns) { + this.maxColumns = maxColumns; + } + + public void setNumberOfColumns(int numberOfColumns) { + this.numberOfColumns = String.valueOf(numberOfColumns); + } + public void setDataLoadProperty(String key, Object value) { dataLoadProperties.put(key, value); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java index 85f8470..87f1190 100644 --- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java +++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java @@ -166,6 +166,7 @@ public class StoreCreator { loadModel.setSegmentId("0"); loadModel.setPartitionId("0"); loadModel.setFactTimeStamp(System.currentTimeMillis()); + loadModel.setMaxColumns("10"); executeGraph(loadModel, absoluteTableIdentifier.getStorePath()); @@ -399,6 +400,8 @@ public class StoreCreator { CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE, CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT)); + CSVInputFormat.setMaxColumns(configuration, "10"); + CSVInputFormat.setNumberOfColumns(configuration, "7"); TaskAttemptContextImpl hadoopAttemptContext = new TaskAttemptContextImpl(configuration, new TaskAttemptID("", 1, TaskType.MAP, 0, 0)); CSVInputFormat format = new CSVInputFormat(); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java b/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java index 66aedb6..676838d 100644 --- a/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java +++ b/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java @@ -150,6 +150,8 @@ public class CSVInputFormatTest extends TestCase { private void prepareConf(Configuration conf) { conf.setBoolean(CSVInputFormat.HEADER_PRESENT, true); + conf.set(CSVInputFormat.MAX_COLUMNS, "10"); + conf.set(CSVInputFormat.NUMBER_OF_COLUMNS, "7"); } private void deleteOutput(File output) {