[ 
https://issues.apache.org/jira/browse/CARBONDATA-2345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16437140#comment-16437140
 ] 

ocean commented on CARBONDATA-2345:
-----------------------------------

stream source is parquet file。

reproduce can use this code:

val tableName = "profile_carbondata_stream2"
 val pqtpath = "/test/stream"
 val warehouse = new File("./warehouse").getCanonicalPath
 val metastore = new File("./metastore").getCanonicalPath
 val spark = SparkSession
 .builder()
 .appName("StreamExample")
 .config("spark.sql.warehouse.dir", warehouse)
 .getOrCreateCarbonSession(warehouse, metastore)

 

val carbonTable = CarbonEnv.getCarbonTable(Some("default"), tableName)(spark)
 val tablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)

var qry: StreamingQuery = null
 val userSchema = spark.read.parquet(pqtpath).schema
 val readSocketDF = spark.readStream.schema(userSchema).parquet(pqtpath)

// Write data from socket stream to carbondata file
 qry = readSocketDF.writeStream
 .format("carbondata")
 .trigger(ProcessingTime("20 seconds"))
 .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
 .option("dbName", "default")
 .option("tableName", tableName)
 .outputMode("append")
 .start()

 

qry.awaitTermination()

> "Task failed while writing rows" error occuers when streaming ingest into 
> carbondata table
> ------------------------------------------------------------------------------------------
>
>                 Key: CARBONDATA-2345
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-2345
>             Project: CarbonData
>          Issue Type: Bug
>          Components: data-load
>    Affects Versions: 1.3.1
>            Reporter: ocean
>            Priority: Major
>
> carbondata version:1.3.1。spark:2.2.1
> When using spark structured streaming ingest data into carbondata table , 
> such error occurs:
> warning: there was one deprecation warning; re-run with -deprecation for 
> details
> qry: org.apache.spark.sql.streaming.StreamingQuery = 
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@7ddf193a
> [Stage 1:> (0 + 2) / 5]18/04/13 18:03:56 WARN TaskSetManager: Lost task 1.0 
> in stage 1.0 (TID 2, sz-pg-entanalytics-research-004.tendcloud.com, executor 
> 1): org.apache.carbondata.streaming.CarbonStreamException: Task failed while 
> writing rows
>  at 
> org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:345)
>  at 
> org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:247)
>  at 
> org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:246)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  at org.apache.spark.scheduler.Task.run(Task.scala:108)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.carbondata.processing.loading.BadRecordsLogger.addBadRecordsToBuilder(BadRecordsLogger.java:126)
>  at 
> org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl.convert(RowConverterImpl.java:164)
>  at 
> org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:186)
>  at 
> org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
>  at 
> org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:336)
>  at 
> org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:326)
>  at 
> org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:326)
>  at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
>  at 
> org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:338)
>  ... 8 more
> [Stage 1:===========> (1 + 2) / 5]18/04/13 18:03:57 ERROR TaskSetManager: 
> Task 0 in stage 1.0 failed 4 times; aborting job
> 18/04/13 18:03:57 ERROR CarbonAppendableStreamSink$: stream execution thread 
> for [id = 3abdadea-65f6-4d94-8686-306fccae4559, runId = 
> 689adf7e-a617-41d9-96bc-de075ce4dd73] Aborting job job_20180413180354_0000.
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
> stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 
> (TID 11, sz-pg-entanalytics-research-004.tendcloud.com, executor 1): 
> org.apache.carbondata.streaming.CarbonStreamException: Task failed while 
> writing rows
>  at 
> org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:345)
>  at 
> org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:247)
>  at 
> org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileJob$1$$anonfun$apply$mcV$sp$1.apply(CarbonAppendableStreamSink.scala:246)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  at org.apache.spark.scheduler.Task.run(Task.scala:108)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.carbondata.processing.loading.BadRecordsLogger.addBadRecordsToBuilder(BadRecordsLogger.java:126)
>  at 
> org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl.convert(RowConverterImpl.java:164)
>  at 
> org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:186)
>  at 
> org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:244)
>  at 
> org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:336)
>  at 
> org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:326)
>  at 
> org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:326)
>  at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
>  at 
> org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:338)
>  ... 8 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to