[ 
https://issues.apache.org/jira/browse/CARBONDATA-1625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhichao  Zhang updated CARBONDATA-1625:
---------------------------------------
    Description: 
I am using Spark 2.1 + CarbonData 1.2, and find that if 
enable.unsafe.sort=true, the length of bytes of column exceed 32768, it will 
load data unsuccessfully. 

My test code: 
    
{code:java}
val longStr = sb.toString()  // the getBytes length of longStr exceeds 32768 
    println(longStr.length()) 
    println(longStr.getBytes("UTF-8").length) 
    
    import spark.implicits._ 
    val df1 = spark.sparkContext.parallelize(0 to 1000) 
      .map(x => ("a", x.toString(), longStr, x, x.toLong, x * 2)) 
      .toDF("stringField1", "stringField2", "stringField3", "intField", 
"longField", "int2Field") 
      
    val df2 = spark.sparkContext.parallelize(1001 to 2000) 
      .map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x * 2)) 
      .toDF("stringField1", "stringField2", "stringField3", "intField", 
"longField", "int2Field") 
      
    val df3 = df1.union(df2) 
    val tableName = "study_carbondata_test" 
    spark.sql(s"DROP TABLE IF EXISTS ${tableName} ").show() 
    val sortScope = "LOCAL_SORT"   // LOCAL_SORT   GLOBAL_SORT 
    spark.sql(s""" 
        |  CREATE TABLE IF NOT EXISTS ${tableName} ( 
        |    stringField1          string, 
        |    stringField2          string, 
        |    stringField3          string, 
        |    intField              int, 
        |    longField             bigint, 
        |    int2Field             int 
        |  ) 
        |  STORED BY 'carbondata' 
        |  TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2', 
        |    'SORT_COLUMNS'='stringField1, stringField2, intField, 
longField', 
        |    'SORT_SCOPE'='${sortScope}', 
        |    'NO_INVERTED_INDEX'='stringField3, int2Field', 
        |    'TABLE_BLOCKSIZE'='64' 
        |  ) 
       """.stripMargin) 
    df3.write 
      .format("carbondata")   
      .option("tableName", "study_carbondata_test") 
      .option("compress", "true")  // just valid when tempCSV is true 
      .option("tempCSV", "false") 
      .option("single_pass", "true") 
      .mode(SaveMode.Append) 
      .save()
{code}


The error message: 

{code:java}
*java.lang.NegativeArraySizeException 
        at 
org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage.getRow(UnsafeCarbonRowPage.java:182)
 
        at 
org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryHolder.readRow(UnsafeInmemoryHolder.java:63)
 
        at 
org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startSorting(UnsafeSingleThreadFinalSortFilesMerger.java:114)
 
        at 
org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startFinalMerge(UnsafeSingleThreadFinalSortFilesMerger.java:81)
 
        at 
org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl.sort(UnsafeParallelReadMergeSorterImpl.java:105)
 
        at 
org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl.execute(SortProcessorStepImpl.java:62)
 
        at 
org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl.execute(DataWriterProcessorStepImpl.java:87)
 
        at 
org.apache.carbondata.processing.newflow.DataLoadExecutor.execute(DataLoadExecutor.java:51)
 
        at 
org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD$$anon$2.<init>(NewCarbonDataLoadRDD.scala:442)
 
        at 
org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD.internalCompute(NewCarbonDataLoadRDD.scala:405)
 
        at 
org.apache.carbondata.spark.rdd.CarbonRDD.compute(CarbonRDD.scala:62) 
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)* 
{code}


Currently, the length of column was stored by short type.

Introduce new datatype of  varchar(size) to store column length more than short 
limit.

  was:
I am using Spark 2.1 + CarbonData 1.2, and find that if 
enable.unsafe.sort=true, the length of bytes of column exceed 32768, it will 
load data unsuccessfully. 

My test code: 
    val longStr = sb.toString()  // the getBytes length of longStr exceeds 
32768 
    println(longStr.length()) 
    println(longStr.getBytes("UTF-8").length) 
    
    import spark.implicits._ 
    val df1 = spark.sparkContext.parallelize(0 to 1000) 
      .map(x => ("a", x.toString(), longStr, x, x.toLong, x * 2)) 
      .toDF("stringField1", "stringField2", "stringField3", "intField", 
"longField", "int2Field") 
      
    val df2 = spark.sparkContext.parallelize(1001 to 2000) 
      .map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x * 2)) 
      .toDF("stringField1", "stringField2", "stringField3", "intField", 
"longField", "int2Field") 
      
    val df3 = df1.union(df2) 
    val tableName = "study_carbondata_test" 
    spark.sql(s"DROP TABLE IF EXISTS ${tableName} ").show() 
    val sortScope = "LOCAL_SORT"   // LOCAL_SORT   GLOBAL_SORT 
    spark.sql(s""" 
        |  CREATE TABLE IF NOT EXISTS ${tableName} ( 
        |    stringField1          string, 
        |    stringField2          string, 
        |    stringField3          string, 
        |    intField              int, 
        |    longField             bigint, 
        |    int2Field             int 
        |  ) 
        |  STORED BY 'carbondata' 
        |  TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2', 
        |    'SORT_COLUMNS'='stringField1, stringField2, intField, 
longField', 
        |    'SORT_SCOPE'='${sortScope}', 
        |    'NO_INVERTED_INDEX'='stringField3, int2Field', 
        |    'TABLE_BLOCKSIZE'='64' 
        |  ) 
       """.stripMargin) 
    df3.write 
      .format("carbondata")   
      .option("tableName", "study_carbondata_test") 
      .option("compress", "true")  // just valid when tempCSV is true 
      .option("tempCSV", "false") 
      .option("single_pass", "true") 
      .mode(SaveMode.Append) 
      .save()

The error message: 
*java.lang.NegativeArraySizeException 
        at 
org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage.getRow(UnsafeCarbonRowPage.java:182)
 
        at 
org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryHolder.readRow(UnsafeInmemoryHolder.java:63)
 
        at 
org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startSorting(UnsafeSingleThreadFinalSortFilesMerger.java:114)
 
        at 
org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startFinalMerge(UnsafeSingleThreadFinalSortFilesMerger.java:81)
 
        at 
org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl.sort(UnsafeParallelReadMergeSorterImpl.java:105)
 
        at 
org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl.execute(SortProcessorStepImpl.java:62)
 
        at 
org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl.execute(DataWriterProcessorStepImpl.java:87)
 
        at 
org.apache.carbondata.processing.newflow.DataLoadExecutor.execute(DataLoadExecutor.java:51)
 
        at 
org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD$$anon$2.<init>(NewCarbonDataLoadRDD.scala:442)
 
        at 
org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD.internalCompute(NewCarbonDataLoadRDD.scala:405)
 
        at 
org.apache.carbondata.spark.rdd.CarbonRDD.compute(CarbonRDD.scala:62) 
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)* 

Currently, the length of column was stored by short type.

Introduce new datatype of  varchar(size) to store column length more than short 
limit.


> Introduce new datatype of  varchar(size) to store column length more than 
> short limit.
> --------------------------------------------------------------------------------------
>
>                 Key: CARBONDATA-1625
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-1625
>             Project: CarbonData
>          Issue Type: New Feature
>          Components: file-format
>            Reporter: Zhichao  Zhang
>            Priority: Minor
>
> I am using Spark 2.1 + CarbonData 1.2, and find that if 
> enable.unsafe.sort=true, the length of bytes of column exceed 32768, it will 
> load data unsuccessfully. 
> My test code: 
>     
> {code:java}
> val longStr = sb.toString()  // the getBytes length of longStr exceeds 32768 
>     println(longStr.length()) 
>     println(longStr.getBytes("UTF-8").length) 
>     
>     import spark.implicits._ 
>     val df1 = spark.sparkContext.parallelize(0 to 1000) 
>       .map(x => ("a", x.toString(), longStr, x, x.toLong, x * 2)) 
>       .toDF("stringField1", "stringField2", "stringField3", "intField", 
> "longField", "int2Field") 
>       
>     val df2 = spark.sparkContext.parallelize(1001 to 2000) 
>       .map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x * 2)) 
>       .toDF("stringField1", "stringField2", "stringField3", "intField", 
> "longField", "int2Field") 
>       
>     val df3 = df1.union(df2) 
>     val tableName = "study_carbondata_test" 
>     spark.sql(s"DROP TABLE IF EXISTS ${tableName} ").show() 
>     val sortScope = "LOCAL_SORT"   // LOCAL_SORT   GLOBAL_SORT 
>     spark.sql(s""" 
>         |  CREATE TABLE IF NOT EXISTS ${tableName} ( 
>         |    stringField1          string, 
>         |    stringField2          string, 
>         |    stringField3          string, 
>         |    intField              int, 
>         |    longField             bigint, 
>         |    int2Field             int 
>         |  ) 
>         |  STORED BY 'carbondata' 
>         |  TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2', 
>         |    'SORT_COLUMNS'='stringField1, stringField2, intField, 
> longField', 
>         |    'SORT_SCOPE'='${sortScope}', 
>         |    'NO_INVERTED_INDEX'='stringField3, int2Field', 
>         |    'TABLE_BLOCKSIZE'='64' 
>         |  ) 
>        """.stripMargin) 
>     df3.write 
>       .format("carbondata")   
>       .option("tableName", "study_carbondata_test") 
>       .option("compress", "true")  // just valid when tempCSV is true 
>       .option("tempCSV", "false") 
>       .option("single_pass", "true") 
>       .mode(SaveMode.Append) 
>       .save()
> {code}
> The error message: 
> {code:java}
> *java.lang.NegativeArraySizeException 
>         at 
> org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage.getRow(UnsafeCarbonRowPage.java:182)
>  
>         at 
> org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryHolder.readRow(UnsafeInmemoryHolder.java:63)
>  
>         at 
> org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startSorting(UnsafeSingleThreadFinalSortFilesMerger.java:114)
>  
>         at 
> org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startFinalMerge(UnsafeSingleThreadFinalSortFilesMerger.java:81)
>  
>         at 
> org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl.sort(UnsafeParallelReadMergeSorterImpl.java:105)
>  
>         at 
> org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl.execute(SortProcessorStepImpl.java:62)
>  
>         at 
> org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl.execute(DataWriterProcessorStepImpl.java:87)
>  
>         at 
> org.apache.carbondata.processing.newflow.DataLoadExecutor.execute(DataLoadExecutor.java:51)
>  
>         at 
> org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD$$anon$2.<init>(NewCarbonDataLoadRDD.scala:442)
>  
>         at 
> org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD.internalCompute(NewCarbonDataLoadRDD.scala:405)
>  
>         at 
> org.apache.carbondata.spark.rdd.CarbonRDD.compute(CarbonRDD.scala:62) 
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)* 
> {code}
> Currently, the length of column was stored by short type.
> Introduce new datatype of  varchar(size) to store column length more than 
> short limit.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to