Vandana Yadav created CARBONDATA-2147:
-----------------------------------------

             Summary: Exception displays while loading data with streaming
                 Key: CARBONDATA-2147
                 URL: https://issues.apache.org/jira/browse/CARBONDATA-2147
             Project: CarbonData
          Issue Type: Bug
          Components: data-load
    Affects Versions: 1.3.0
         Environment: spark 2.1, spark 2.2.1
            Reporter: Vandana Yadav


Exception displays while loading data with streaming

Steps to reproduce:

1) start spark-shell:

./spark-shell --jars 
/opt/spark/spark-2.2.1/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar

2) Execute following script:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.streaming.\{ProcessingTime, StreamingQuery}

val carbon = SparkSession.builder().config(sc.getConf) 
.getOrCreateCarbonSession("hdfs://localhost:54310/newCarbonStore","/tmp")

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
 "FORCE")

carbon.sql("drop table if exists uniqdata_stream")

carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB 
timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 
bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 
decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 
int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= 
'256 MB', 'streaming'='true')");
import carbon.sqlContext.implicits._

import org.apache.spark.sql.types._
val uniqdataSch = StructType(
Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", 
StringType),StructField("DOB", TimestampType), StructField("DOJ", 
TimestampType), StructField("BIGINT_COLUMN1", LongType), 
StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", 
org.apache.spark.sql.types.DecimalType(30, 10)), StructField("DECIMAL_COLUMN2", 
org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1", 
DoubleType), StructField("Double_COLUMN2", DoubleType), 
StructField("INTEGER_COLUMN1", IntegerType)))

val streamDf = carbon.readStream
.schema(uniqdataSch)
.option("sep", ",")
.csv("file:///home/knoldus/Documents/uniqdata")
val qry = streamDf.writeStream.format("carbondata").trigger(ProcessingTime("5 
seconds"))
 .option("checkpointLocation","/stream/uniq")
 .option("dbName", "default")
 .option("tableName", "uniqdata_stream")
 .start()

 

3) Error logs:

warning: there was one deprecation warning; re-run with -deprecation for details
uniqdataSch: org.apache.spark.sql.types.StructType = 
StructType(StructField(CUST_ID,IntegerType,true), 
StructField(CUST_NAME,StringType,true), StructField(DOB,TimestampType,true), 
StructField(DOJ,TimestampType,true), StructField(BIGINT_COLUMN1,LongType,true), 
StructField(BIGINT_COLUMN2,LongType,true), 
StructField(DECIMAL_COLUMN1,DecimalType(30,10),true), 
StructField(DECIMAL_COLUMN2,DecimalType(36,10),true), 
StructField(Double_COLUMN1,DoubleType,true), 
StructField(Double_COLUMN2,DoubleType,true), 
StructField(INTEGER_COLUMN1,IntegerType,true))
streamDf: org.apache.spark.sql.DataFrame = [CUST_ID: int, CUST_NAME: string ... 
9 more fields]
qry: org.apache.spark.sql.streaming.StreamingQuery = 
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@d0e155c

scala> 18/02/08 16:38:53 ERROR StreamSegment: Executor task launch worker for 
task 5 Failed to append batch data to stream segment: 
hdfs://localhost:54310/newCarbonStore/default/uniqdata_stream1/Fact/Part0/Segment_0
java.lang.NullPointerException
 at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:32)
 at 
org.apache.carbondata.streaming.parser.CSVStreamParserImp.parserRow(CSVStreamParserImp.java:40)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:337)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:331)
 at 
org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:108)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
18/02/08 16:38:53 ERROR Utils: Aborting task
java.lang.NullPointerException
 at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:32)
 at 
org.apache.carbondata.streaming.parser.CSVStreamParserImp.parserRow(CSVStreamParserImp.java:40)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:337)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:331)
 at 
org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:108)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
18/02/08 16:38:53 ERROR CarbonAppendableStreamSink$: Executor task launch 
worker for task 5 Job job_20180208163853_0005 aborted.
18/02/08 16:38:53 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
org.apache.carbondata.streaming.CarbonStreamException: Task failed while 
writing rows
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:324)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:228)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:227)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:108)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
 at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:32)
 at 
org.apache.carbondata.streaming.parser.CSVStreamParserImp.parserRow(CSVStreamParserImp.java:40)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:337)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$InputIterator.next(CarbonAppendableStreamSink.scala:331)
 at 
org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:315)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:305)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
 at 
org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:317)
 ... 8 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to