Repository: incubator-carbondata
Updated Branches:
  refs/heads/master a8ed450bf -> 4397d0599


changed max columns from static value to configurable


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/0a09472a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/0a09472a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/0a09472a

Branch: refs/heads/master
Commit: 0a09472a4e710caa3960188b7ab0a405abdc9abc
Parents: a8ed450
Author: kunal642 <kunal.kap...@knoldus.in>
Authored: Sat Apr 15 13:48:17 2017 +0530
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Mon Apr 17 13:32:06 2017 +0530

----------------------------------------------------------------------
 .../hadoop/test/util/StoreCreator.java          |  3 ++
 .../TestDataLoadWithColumnsMoreThanSchema.scala | 36 ++++++-------
 .../dataload/TestLoadDataWithHiveSyntax.scala   |  2 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  3 ++
 .../carbondata/spark/util/CommonUtil.scala      | 57 +++++++++++++++++++-
 .../execution/command/carbonTableSchema.scala   |  4 +-
 .../dataload/SparkDatasourceSuite.scala         |  1 -
 .../util/ExternalColumnDictionaryTestCase.scala |  1 +
 ...GlobalDictionaryUtilConcurrentTestCase.scala | 12 +++--
 .../util/GlobalDictionaryUtilTestCase.scala     |  1 +
 .../execution/command/carbonTableSchema.scala   |  5 +-
 .../processing/csvload/CSVInputFormat.java      | 25 ++++++++-
 .../processing/model/CarbonLoadModel.java       |  1 +
 .../newflow/CarbonDataLoadConfiguration.java    | 12 +++++
 .../carbondata/processing/StoreCreator.java     |  3 ++
 .../processing/csvload/CSVInputFormatTest.java  |  2 +
 16 files changed, 136 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java 
b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index 51ce2c5..2997e94 100644
--- 
a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ 
b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -166,6 +166,7 @@ public class StoreCreator {
       loadModel.setSegmentId("0");
       loadModel.setPartitionId("0");
       loadModel.setFactTimeStamp(System.currentTimeMillis());
+      loadModel.setMaxColumns("10");
 
       executeGraph(loadModel, absoluteTableIdentifier.getStorePath());
 
@@ -399,6 +400,8 @@ public class StoreCreator {
     CSVInputFormat.setReadBufferSize(configuration, 
CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
             CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT));
+    CSVInputFormat.setNumberOfColumns(configuration, 
String.valueOf(loadModel.getCsvHeaderColumns().length));
+    CSVInputFormat.setMaxColumns(configuration, "10");
 
     TaskAttemptContextImpl hadoopAttemptContext = new 
TaskAttemptContextImpl(configuration, new TaskAttemptID("", 1, TaskType.MAP, 0, 
0));
     CSVInputFormat format = new CSVInputFormat();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
index 9711051..c25e520 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
@@ -84,46 +84,42 @@ class TestDataLoadWithColumnsMoreThanSchema extends 
QueryTest with BeforeAndAfte
   }
 
   test("test for maxcolumns option value greater than threshold value for 
maxcolumns") {
-    sql("DROP TABLE IF EXISTS valid_max_columns_test")
-    sql("CREATE TABLE valid_max_columns_test (imei string,age int,task 
bigint,num double,level decimal(10,3),productdate timestamp,mark int,name 
string)STORED BY 'org.apache.carbondata.format'")
-    try {
+    intercept[Exception] {
+      sql("DROP TABLE IF EXISTS valid_max_columns_test")
+      sql(
+        "CREATE TABLE valid_max_columns_test (imei string,age int,task 
bigint,num double,level decimal(10,3),productdate timestamp,mark int,name 
string)STORED BY 'org.apache.carbondata.format'")
       sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/character_carbon.csv' into 
table valid_max_columns_test options('MAXCOLUMNS'='22000')")
-      checkAnswer(sql("select count(*) from valid_max_columns_test"),
-        sql("select count(*) from hive_char_test"))
-    } catch {
-      case _: Throwable => assert(false)
     }
   }
 
   test("test for boundary value for maxcolumns") {
-    sql("DROP TABLE IF EXISTS boundary_max_columns_test")
-    sql("CREATE TABLE boundary_max_columns_test (empno string, empname String, 
designation String, doj String, " +
+    intercept[Exception] {
+      sql("DROP TABLE IF EXISTS boundary_max_columns_test")
+      sql(
+        "CREATE TABLE boundary_max_columns_test (empno string, empname String, 
designation " +
+        "String, doj String, " +
         "workgroupcategory string, workgroupcategoryname String, deptno 
string, deptname String, " +
         "projectcode string, projectjoindate String, projectenddate 
String,attendance double," +
         "utilization double,salary double) STORED BY 
'org.apache.carbondata.format' TBLPROPERTIES" +
         
"('DICTIONARY_EXCLUDE'='empno,empname,designation,doj,workgroupcategory," +
         
"workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate')")
-    try {
-      sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' into table 
boundary_max_columns_test options('MAXCOLUMNS'='14')")
-      assert(true)
-    } catch {
-      case _: Throwable => assert(false)
+      sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' into table 
boundary_max_columns_test" +
+          s" options('MAXCOLUMNS'='14')")
+
     }
   }
 
   test("test for maxcolumns value less than columns in 1st line of csv file") {
-    sql("DROP TABLE IF EXISTS boundary_max_columns_test")
-    sql("CREATE TABLE boundary_max_columns_test (empno string, empname String, 
designation String, doj String, " +
+    intercept[Exception] {
+      sql("DROP TABLE IF EXISTS boundary_max_columns_test")
+      sql(
+        "CREATE TABLE boundary_max_columns_test (empno string, empname String, 
designation String, doj String, " +
         "workgroupcategory string, workgroupcategoryname String, deptno 
string, deptname String, " +
         "projectcode string, projectjoindate String, projectenddate 
String,attendance double," +
         "utilization double,salary double) STORED BY 
'org.apache.carbondata.format' TBLPROPERTIES" +
         
"('DICTIONARY_EXCLUDE'='empno,empname,designation,doj,workgroupcategory," +
         
"workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate')")
-    try {
       sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' into table 
boundary_max_columns_test options('MAXCOLUMNS'='13')")
-      assert(true)
-    } catch {
-      case _: Throwable => assert(false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala
index 353db9e..561b0d1 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntax.scala
@@ -651,7 +651,7 @@ class TestLoadDataWithHiveSyntax extends QueryTest with 
BeforeAndAfterAll {
     )
     sql(
       s"LOAD DATA local inpath '$resourcesPath/comment.csv' INTO TABLE 
comment_test " +
-        "options('DELIMITER' = ',', 'QUOTECHAR' = '.', 'COMMENTCHAR' = 
'?','FILEHEADER'='imei,age,task,num,level,productdate,mark,name')"
+        "options('DELIMITER' = ',', 'QUOTECHAR' = '.', 'COMMENTCHAR' = 
'?','FILEHEADER'='imei,age,task,num,level,productdate,mark,name', 
'maxcolumns'='180')"
     )
     checkAnswer(sql("select imei from 
comment_test"),Seq(Row("\".carbon"),Row("#?carbon"), Row(""),
       Row("~carbon,")))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 350a2ec..49984c9 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -64,6 +64,7 @@ class CarbonMergerRDD[K, V](
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
   sc.setLocalProperty("spark.job.interruptOnCancel", "true")
 
+  private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() 
+ "")
   var storeLocation: String = null
   var mergeResult: String = null
   val hdfsStoreLocation = carbonMergerMapping.hdfsStoreLocation
@@ -260,6 +261,8 @@ class CarbonMergerRDD[K, V](
     val jobConf: JobConf = new JobConf(new Configuration)
     val job: Job = new Job(jobConf)
     val format = 
CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
+    // initialise query_id for job
+    job.getConfiguration.set("query.id", queryId)
     var defaultParallelism = sparkContext.defaultParallelism
     val result = new java.util.ArrayList[Partition](defaultParallelism)
     var partitionNo = 0

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 7592e4e..679a4e7 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -273,6 +273,9 @@ object CommonUtil {
     CSVInputFormat.setCommentCharacter(configuration, 
carbonLoadModel.getCommentChar)
     CSVInputFormat.setCSVDelimiter(configuration, 
carbonLoadModel.getCsvDelimiter)
     CSVInputFormat.setEscapeCharacter(configuration, 
carbonLoadModel.getEscapeChar)
+    CSVInputFormat.setMaxColumns(configuration, carbonLoadModel.getMaxColumns)
+    CSVInputFormat.setNumberOfColumns(configuration, 
carbonLoadModel.getCsvHeaderColumns.length
+      .toString)
     CSVInputFormat.setHeaderExtractionEnabled(configuration,
       carbonLoadModel.getCsvHeader == null || 
carbonLoadModel.getCsvHeader.isEmpty)
     CSVInputFormat.setQuoteCharacter(configuration, 
carbonLoadModel.getQuoteChar)
@@ -342,7 +345,59 @@ object CommonUtil {
           + "the same. Input file : " + csvFile)
       }
     }
-
     csvColumns
   }
+
+  def validateMaxColumns(csvHeaders: Array[String], maxColumns: String): Int = 
{
+    /*
+    User configures both csvheadercolumns, maxcolumns,
+      if csvheadercolumns >= maxcolumns, give error
+      if maxcolumns > threashold, give error
+    User configures csvheadercolumns
+      if csvheadercolumns >= maxcolumns(default) then maxcolumns = 
csvheadercolumns+1
+      if csvheadercolumns >= threashold, give error
+    User configures nothing
+      if csvheadercolumns >= maxcolumns(default) then maxcolumns = 
csvheadercolumns+1
+      if csvheadercolumns >= threashold, give error
+     */
+    val columnCountInSchema = csvHeaders.length
+    var maxNumberOfColumnsForParsing = 0
+    val maxColumnsInt = getMaxColumnValue(maxColumns)
+    if (maxColumnsInt != null) {
+      if (columnCountInSchema >= maxColumnsInt) {
+        sys.error(s"csv headers should be less than the max columns: 
$maxColumnsInt")
+      } else if (maxColumnsInt > 
CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+        sys.error(s"max columns cannot be greater than the threshold value: ${
+          CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING
+        }")
+      } else {
+        maxNumberOfColumnsForParsing = maxColumnsInt
+      }
+    } else if (columnCountInSchema >= 
CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+      sys.error(s"csv header columns should be less than max threashold: ${
+        CSVInputFormat
+          .THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING
+      }")
+    } else if (columnCountInSchema >= 
CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+      maxNumberOfColumnsForParsing = columnCountInSchema + 1
+    } else {
+      maxNumberOfColumnsForParsing = 
CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING
+    }
+    maxNumberOfColumnsForParsing
+  }
+
+  private def getMaxColumnValue(maxColumn: String): Integer = {
+    if (maxColumn != null) {
+      try {
+        maxColumn.toInt
+      } catch {
+        case e: Exception =>
+          LOGGER.error(s"Invalid value for max column in load options ${ 
e.getMessage }")
+          null
+      }
+    } else {
+      null
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 5a22e9c..15472e5 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -404,7 +404,6 @@ case class LoadTable(
       validateDateFormat(dateFormat, table)
       val maxColumns = options.getOrElse("maxcolumns", null)
 
-      carbonLoadModel.setMaxColumns(checkDefaultValue(maxColumns, null))
       carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\"))
       carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
       carbonLoadModel.setCommentChar(checkDefaultValue(commentchar, "#"))
@@ -474,6 +473,9 @@ case class LoadTable(
         carbonLoadModel.setColDictFilePath(columnDict)
         carbonLoadModel.setDirectLoad(true)
         
carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
+        val validatedMaxColumns = 
CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns,
+          maxColumns)
+        carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
         GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
 
         if (carbonLoadModel.getUseOnePass) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
index 4c5b241..0b64759 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.integration.spark.testsuite.dataload
 
-import java.io.File
 
 import org.apache.spark.sql.common.util.QueryTest
 import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 5a986b7..05b94ee 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -148,6 +148,7 @@ class ExternalColumnDictionaryTestCase extends QueryTest 
with BeforeAndAfterAll
       CarbonCommonConstants.CARBON_DATE_FORMAT,
       CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))  
     
carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
+    carbonLoadModel.setMaxColumns("100")
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
index 377bbaa..9e0f851 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.carbondata.spark.util
 
-import java.io.File
 import java.util.concurrent.{Callable, Executors}
 
 import scala.collection.mutable.ListBuffer
@@ -64,6 +63,7 @@ class GlobalDictionaryUtilConcurrentTestCase extends 
QueryTest with BeforeAndAft
       CarbonCommonConstants.CARBON_DATE_FORMAT,
       CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))  
     
carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
+    carbonLoadModel.setMaxColumns("2000")
     carbonLoadModel
   }
 
@@ -88,6 +88,7 @@ class GlobalDictionaryUtilConcurrentTestCase extends 
QueryTest with BeforeAndAft
       "employee")(sqlContext)
       .asInstanceOf[CarbonRelation]
   }
+
   def writedummydata(filePath: String, recCount: Int) = {
     var a: Int = 0
     var records: StringBuilder = StringBuilder.newBuilder
@@ -98,6 +99,7 @@ class GlobalDictionaryUtilConcurrentTestCase extends 
QueryTest with BeforeAndAft
     dis.writeBytes(records.toString())
     dis.close()
   }
+
   test("concurrent dictionary generation") {
     
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME,
 "-1")
     val noOfFiles = 5
@@ -114,8 +116,8 @@ class GlobalDictionaryUtilConcurrentTestCase extends 
QueryTest with BeforeAndAft
       for (i <- 0 until noOfFiles) {
         dictGenerators.add(new DictGenerator(loadModels(i)))
       }
-      val executorService = Executors.newFixedThreadPool(10);
-      val results = executorService.invokeAll(dictGenerators);
+      val executorService = Executors.newFixedThreadPool(10)
+      val results = executorService.invokeAll(dictGenerators)
       for (i <- 0 until noOfFiles) {
         val res = results.get(i).get
         assert("Pass".equals(res))
@@ -128,7 +130,7 @@ class GlobalDictionaryUtilConcurrentTestCase extends 
QueryTest with BeforeAndAft
     val carbonTableIdentifier = 
sampleRelation.tableMeta.carbonTable.getCarbonTableIdentifier
     val columnIdentifier = 
sampleRelation.tableMeta.carbonTable.getDimensionByName("employee", 
"empid").getColumnIdentifier
     val carbonTablePath = PathFactory.getInstance()
-        .getCarbonTablePath(sampleRelation.tableMeta.storePath, 
carbonTableIdentifier);
+        .getCarbonTablePath(sampleRelation.tableMeta.storePath, 
carbonTableIdentifier)
     val dictPath = 
carbonTablePath.getDictionaryFilePath(columnIdentifier.getColumnId)
     val dictFile = FileFactory.getCarbonFile(dictPath, 
FileFactory.getFileType(dictPath))
     val offSet = dictFile.getSize
@@ -146,11 +148,13 @@ class GlobalDictionaryUtilConcurrentTestCase extends 
QueryTest with BeforeAndAft
       file.delete()
     }
   }
+
   override def afterAll {
     sql("drop table if exists employee")
     
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME,
         
Integer.toString(CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME))
   }
+
   class DictGenerator(loadModel: CarbonLoadModel) extends Callable[String] {
    override def call:String = {
      var result = "Pass"

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
index 189e694..c4b213f 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
@@ -70,6 +70,7 @@ class GlobalDictionaryUtilTestCase extends QueryTest with 
BeforeAndAfterAll {
       CarbonCommonConstants.CARBON_DATE_FORMAT,
       CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))  
     
carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
+    carbonLoadModel.setMaxColumns("2000")
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 1451247..c8e0436 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -405,7 +405,6 @@ case class LoadTable(
       val dateFormat = options.getOrElse("dateformat", null)
       validateDateFormat(dateFormat, table)
       val maxColumns = options.getOrElse("maxcolumns", null)
-      carbonLoadModel.setMaxColumns(checkDefaultValue(maxColumns, null))
       carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\"))
       carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
       carbonLoadModel.setCommentChar(checkDefaultValue(commentchar, "#"))
@@ -472,8 +471,10 @@ case class LoadTable(
         carbonLoadModel.setColDictFilePath(columnDict)
         carbonLoadModel.setDirectLoad(true)
         
carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
+        val validatedMaxColumns = 
CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns,
+          maxColumns)
+        carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
         GlobalDictionaryUtil.updateTableMetadataFunc = 
LoadTable.updateTableMetadata
-
         if (carbonLoadModel.getUseOnePass) {
           val colDictFilePath = carbonLoadModel.getColDictFilePath
           if (colDictFilePath != null) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
 
b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
index 1f7d403..e252e7f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
@@ -21,6 +21,9 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
 import com.univocity.parsers.csv.CsvParser;
 import com.univocity.parsers.csv.CsvParserSettings;
 import org.apache.hadoop.conf.Configuration;
@@ -63,6 +66,14 @@ public class CSVInputFormat extends 
FileInputFormat<NullWritable, StringArrayWri
   public static final boolean HEADER_PRESENT_DEFAULT = false;
   public static final String READ_BUFFER_SIZE = 
"carbon.csvinputformat.read.buffer.size";
   public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
+  public static final String MAX_COLUMNS = "carbon.csvinputformat.max.columns";
+  public static final String NUMBER_OF_COLUMNS = 
"carbon.csvinputformat.number.of.columns";
+  public static final int DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING = 2000;
+  public static final int THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING = 20000;
+
+  private static LogService LOGGER =
+      LogServiceFactory.getLogService(CSVInputFormat.class.toString());
+
 
   @Override
   public RecordReader<NullWritable, StringArrayWritable> 
createRecordReader(InputSplit inputSplit,
@@ -145,6 +156,16 @@ public class CSVInputFormat extends 
FileInputFormat<NullWritable, StringArrayWri
     }
   }
 
+  public static void setMaxColumns(Configuration configuration, String 
maxColumns) {
+    if (maxColumns != null) {
+      configuration.set(MAX_COLUMNS, maxColumns);
+    }
+  }
+
+  public static void setNumberOfColumns(Configuration configuration, String 
numberOfColumns) {
+    configuration.set(NUMBER_OF_COLUMNS, numberOfColumns);
+  }
+
   /**
    * Treats value as line in file. Key is null.
    */
@@ -220,8 +241,8 @@ public class CSVInputFormat extends 
FileInputFormat<NullWritable, StringArrayWri
       parserSettings.setIgnoreTrailingWhitespaces(false);
       parserSettings.setSkipEmptyLines(false);
       parserSettings.setMaxCharsPerColumn(100000);
-      // TODO get from csv file.
-      parserSettings.setMaxColumns(1000);
+      String maxColumns = job.get(MAX_COLUMNS);
+      parserSettings.setMaxColumns(Integer.parseInt(maxColumns));
       parserSettings.getFormat().setQuote(job.get(QUOTE, 
QUOTE_DEFAULT).charAt(0));
       parserSettings.getFormat().setQuoteEscape(job.get(ESCAPE, 
ESCAPE_DEFAULT).charAt(0));
       if (start == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index 525874f..d8f84bf 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -171,6 +171,7 @@ public class CarbonLoadModel implements Serializable {
    */
   private boolean preFetch;
 
+  private String numberOfcolumns;
   /**
    * get escape char
    *

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
index 0bd3e45..12be777 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
@@ -46,6 +46,10 @@ public class CarbonDataLoadConfiguration {
 
   private BucketingInfo bucketingInfo;
 
+  private String numberOfColumns;
+
+  private String maxColumns;
+
   private Map<String, Object> dataLoadProperties = new HashMap<>();
 
   /**
@@ -185,6 +189,14 @@ public class CarbonDataLoadConfiguration {
     this.taskNo = taskNo;
   }
 
+  public void setMaxColumns(String maxColumns) {
+    this.maxColumns = maxColumns;
+  }
+
+  public void setNumberOfColumns(int numberOfColumns) {
+    this.numberOfColumns = String.valueOf(numberOfColumns);
+  }
+
   public void setDataLoadProperty(String key, Object value) {
     dataLoadProperties.put(key, value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git 
a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java 
b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 85f8470..87f1190 100644
--- 
a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ 
b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -166,6 +166,7 @@ public class StoreCreator {
       loadModel.setSegmentId("0");
       loadModel.setPartitionId("0");
       loadModel.setFactTimeStamp(System.currentTimeMillis());
+      loadModel.setMaxColumns("10");
 
       executeGraph(loadModel, absoluteTableIdentifier.getStorePath());
 
@@ -399,6 +400,8 @@ public class StoreCreator {
     CSVInputFormat.setReadBufferSize(configuration, 
CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
             CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT));
+    CSVInputFormat.setMaxColumns(configuration, "10");
+    CSVInputFormat.setNumberOfColumns(configuration, "7");
 
     TaskAttemptContextImpl hadoopAttemptContext = new 
TaskAttemptContextImpl(configuration, new TaskAttemptID("", 1, TaskType.MAP, 0, 
0));
     CSVInputFormat format = new CSVInputFormat();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0a09472a/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java
 
b/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java
index 66aedb6..676838d 100644
--- 
a/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java
+++ 
b/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java
@@ -150,6 +150,8 @@ public class CSVInputFormatTest extends TestCase {
 
   private void prepareConf(Configuration conf) {
     conf.setBoolean(CSVInputFormat.HEADER_PRESENT, true);
+    conf.set(CSVInputFormat.MAX_COLUMNS, "10");
+    conf.set(CSVInputFormat.NUMBER_OF_COLUMNS, "7");
   }
 
   private void deleteOutput(File output) {

Reply via email to