[
https://issues.apache.org/jira/browse/CARBONDATA-1625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhichao Zhang updated CARBONDATA-1625:
---------------------------------------
Description:
I am using Spark 2.1 + CarbonData 1.2, and find that if
enable.unsafe.sort=true, the length of bytes of column exceed 32768, it will
load data unsuccessfully.
My test code:
{code:java}
val longStr = sb.toString() // the getBytes length of longStr exceeds 32768
println(longStr.length())
println(longStr.getBytes("UTF-8").length)
import spark.implicits._
val df1 = spark.sparkContext.parallelize(0 to 1000)
.map(x => ("a", x.toString(), longStr, x, x.toLong, x * 2))
.toDF("stringField1", "stringField2", "stringField3", "intField",
"longField", "int2Field")
val df2 = spark.sparkContext.parallelize(1001 to 2000)
.map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x * 2))
.toDF("stringField1", "stringField2", "stringField3", "intField",
"longField", "int2Field")
val df3 = df1.union(df2)
val tableName = "study_carbondata_test"
spark.sql(s"DROP TABLE IF EXISTS ${tableName} ").show()
val sortScope = "LOCAL_SORT" // LOCAL_SORT GLOBAL_SORT
spark.sql(s"""
| CREATE TABLE IF NOT EXISTS ${tableName} (
| stringField1 string,
| stringField2 string,
| stringField3 string,
| intField int,
| longField bigint,
| int2Field int
| )
| STORED BY 'carbondata'
| TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2',
| 'SORT_COLUMNS'='stringField1, stringField2, intField,
longField',
| 'SORT_SCOPE'='${sortScope}',
| 'NO_INVERTED_INDEX'='stringField3, int2Field',
| 'TABLE_BLOCKSIZE'='64'
| )
""".stripMargin)
df3.write
.format("carbondata")
.option("tableName", "study_carbondata_test")
.option("compress", "true") // just valid when tempCSV is true
.option("tempCSV", "false")
.option("single_pass", "true")
.mode(SaveMode.Append)
.save()
{code}
The error message:
{code:java}
*java.lang.NegativeArraySizeException
at
org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage.getRow(UnsafeCarbonRowPage.java:182)
at
org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryHolder.readRow(UnsafeInmemoryHolder.java:63)
at
org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startSorting(UnsafeSingleThreadFinalSortFilesMerger.java:114)
at
org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startFinalMerge(UnsafeSingleThreadFinalSortFilesMerger.java:81)
at
org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl.sort(UnsafeParallelReadMergeSorterImpl.java:105)
at
org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl.execute(SortProcessorStepImpl.java:62)
at
org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl.execute(DataWriterProcessorStepImpl.java:87)
at
org.apache.carbondata.processing.newflow.DataLoadExecutor.execute(DataLoadExecutor.java:51)
at
org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD$$anon$2.<init>(NewCarbonDataLoadRDD.scala:442)
at
org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD.internalCompute(NewCarbonDataLoadRDD.scala:405)
at
org.apache.carbondata.spark.rdd.CarbonRDD.compute(CarbonRDD.scala:62)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)*
{code}
Currently, the length of column was stored by short type.
Introduce new datatype of varchar(size) to store column length more than short
limit.
was:
I am using Spark 2.1 + CarbonData 1.2, and find that if
enable.unsafe.sort=true, the length of bytes of column exceed 32768, it will
load data unsuccessfully.
My test code:
val longStr = sb.toString() // the getBytes length of longStr exceeds
32768
println(longStr.length())
println(longStr.getBytes("UTF-8").length)
import spark.implicits._
val df1 = spark.sparkContext.parallelize(0 to 1000)
.map(x => ("a", x.toString(), longStr, x, x.toLong, x * 2))
.toDF("stringField1", "stringField2", "stringField3", "intField",
"longField", "int2Field")
val df2 = spark.sparkContext.parallelize(1001 to 2000)
.map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x * 2))
.toDF("stringField1", "stringField2", "stringField3", "intField",
"longField", "int2Field")
val df3 = df1.union(df2)
val tableName = "study_carbondata_test"
spark.sql(s"DROP TABLE IF EXISTS ${tableName} ").show()
val sortScope = "LOCAL_SORT" // LOCAL_SORT GLOBAL_SORT
spark.sql(s"""
| CREATE TABLE IF NOT EXISTS ${tableName} (
| stringField1 string,
| stringField2 string,
| stringField3 string,
| intField int,
| longField bigint,
| int2Field int
| )
| STORED BY 'carbondata'
| TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2',
| 'SORT_COLUMNS'='stringField1, stringField2, intField,
longField',
| 'SORT_SCOPE'='${sortScope}',
| 'NO_INVERTED_INDEX'='stringField3, int2Field',
| 'TABLE_BLOCKSIZE'='64'
| )
""".stripMargin)
df3.write
.format("carbondata")
.option("tableName", "study_carbondata_test")
.option("compress", "true") // just valid when tempCSV is true
.option("tempCSV", "false")
.option("single_pass", "true")
.mode(SaveMode.Append)
.save()
The error message:
*java.lang.NegativeArraySizeException
at
org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage.getRow(UnsafeCarbonRowPage.java:182)
at
org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryHolder.readRow(UnsafeInmemoryHolder.java:63)
at
org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startSorting(UnsafeSingleThreadFinalSortFilesMerger.java:114)
at
org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startFinalMerge(UnsafeSingleThreadFinalSortFilesMerger.java:81)
at
org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl.sort(UnsafeParallelReadMergeSorterImpl.java:105)
at
org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl.execute(SortProcessorStepImpl.java:62)
at
org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl.execute(DataWriterProcessorStepImpl.java:87)
at
org.apache.carbondata.processing.newflow.DataLoadExecutor.execute(DataLoadExecutor.java:51)
at
org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD$$anon$2.<init>(NewCarbonDataLoadRDD.scala:442)
at
org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD.internalCompute(NewCarbonDataLoadRDD.scala:405)
at
org.apache.carbondata.spark.rdd.CarbonRDD.compute(CarbonRDD.scala:62)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)*
Currently, the length of column was stored by short type.
Introduce new datatype of varchar(size) to store column length more than short
limit.
> Introduce new datatype of varchar(size) to store column length more than
> short limit.
> --------------------------------------------------------------------------------------
>
> Key: CARBONDATA-1625
> URL: https://issues.apache.org/jira/browse/CARBONDATA-1625
> Project: CarbonData
> Issue Type: New Feature
> Components: file-format
> Reporter: Zhichao Zhang
> Priority: Minor
>
> I am using Spark 2.1 + CarbonData 1.2, and find that if
> enable.unsafe.sort=true, the length of bytes of column exceed 32768, it will
> load data unsuccessfully.
> My test code:
>
> {code:java}
> val longStr = sb.toString() // the getBytes length of longStr exceeds 32768
> println(longStr.length())
> println(longStr.getBytes("UTF-8").length)
>
> import spark.implicits._
> val df1 = spark.sparkContext.parallelize(0 to 1000)
> .map(x => ("a", x.toString(), longStr, x, x.toLong, x * 2))
> .toDF("stringField1", "stringField2", "stringField3", "intField",
> "longField", "int2Field")
>
> val df2 = spark.sparkContext.parallelize(1001 to 2000)
> .map(x => ("b", x.toString(), (x % 2).toString(), x, x.toLong, x * 2))
> .toDF("stringField1", "stringField2", "stringField3", "intField",
> "longField", "int2Field")
>
> val df3 = df1.union(df2)
> val tableName = "study_carbondata_test"
> spark.sql(s"DROP TABLE IF EXISTS ${tableName} ").show()
> val sortScope = "LOCAL_SORT" // LOCAL_SORT GLOBAL_SORT
> spark.sql(s"""
> | CREATE TABLE IF NOT EXISTS ${tableName} (
> | stringField1 string,
> | stringField2 string,
> | stringField3 string,
> | intField int,
> | longField bigint,
> | int2Field int
> | )
> | STORED BY 'carbondata'
> | TBLPROPERTIES('DICTIONARY_INCLUDE'='stringField1, stringField2',
> | 'SORT_COLUMNS'='stringField1, stringField2, intField,
> longField',
> | 'SORT_SCOPE'='${sortScope}',
> | 'NO_INVERTED_INDEX'='stringField3, int2Field',
> | 'TABLE_BLOCKSIZE'='64'
> | )
> """.stripMargin)
> df3.write
> .format("carbondata")
> .option("tableName", "study_carbondata_test")
> .option("compress", "true") // just valid when tempCSV is true
> .option("tempCSV", "false")
> .option("single_pass", "true")
> .mode(SaveMode.Append)
> .save()
> {code}
> The error message:
> {code:java}
> *java.lang.NegativeArraySizeException
> at
> org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage.getRow(UnsafeCarbonRowPage.java:182)
>
> at
> org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryHolder.readRow(UnsafeInmemoryHolder.java:63)
>
> at
> org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startSorting(UnsafeSingleThreadFinalSortFilesMerger.java:114)
>
> at
> org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger.startFinalMerge(UnsafeSingleThreadFinalSortFilesMerger.java:81)
>
> at
> org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl.sort(UnsafeParallelReadMergeSorterImpl.java:105)
>
> at
> org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl.execute(SortProcessorStepImpl.java:62)
>
> at
> org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl.execute(DataWriterProcessorStepImpl.java:87)
>
> at
> org.apache.carbondata.processing.newflow.DataLoadExecutor.execute(DataLoadExecutor.java:51)
>
> at
> org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD$$anon$2.<init>(NewCarbonDataLoadRDD.scala:442)
>
> at
> org.apache.carbondata.spark.rdd.NewDataFrameLoaderRDD.internalCompute(NewCarbonDataLoadRDD.scala:405)
>
> at
> org.apache.carbondata.spark.rdd.CarbonRDD.compute(CarbonRDD.scala:62)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)*
> {code}
> Currently, the length of column was stored by short type.
> Introduce new datatype of varchar(size) to store column length more than
> short limit.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)