Jayesh lalwani created SPARK-19738:
--------------------------------------

             Summary: Consider adding error handler to DataStreamWriter
                 Key: SPARK-19738
                 URL: https://issues.apache.org/jira/browse/SPARK-19738
             Project: Spark
          Issue Type: Improvement
          Components: Structured Streaming
    Affects Versions: 2.1.0
            Reporter: Jayesh lalwani


For Structured streaming implementations, it is important that the applications 
stay always On. However, right now, errors stop the driver. In some cases, this 
is not desirable behavior. For example, I have the following application

{code}
import org.apache.spark.sql.types._
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = 
spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
csvDF.writeStream.format("console").start()
{code}

I send the following input to it 

{quote}
1,Iron man
2,SUperman
{quote}

Obviously, the data is bad. This causes the executor to throw an exception that 
propogates to the driver, which promptly shuts down. The driver is running in 
supervised mode, and it gets restarted. The application reads the same bad 
input and shuts down again. This goes ad-infinitum. This behavior is desirable, 
in cases, the error is recoverable. For example, if the executor cannot talk to 
the database, we want the application to keep trying the same input again and 
again till the database recovers. However, for some cases, this behavior is 
undesirable. We do not want this to happen when the input is bad. We want to 
put the bad record in some sort of dead letter queue. Or maybe we want to kill 
the driver only when the number of errors have crossed a certain threshold. Or 
maybe we want to email someone.

Proposal:

Add a error handler to the data stream. When the executor fails, it should call 
the error handler and pass the Exception to the error handler. The error 
handler could eat the exception, or transform it, or update counts in an 
accumulator, etc

 {code}
import org.apache.spark.sql.types._
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = 
spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
csvDF.writeStream.format("console").errorhandler("com.jayesh.ErrorHandler").start()
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to