Jayesh lalwani created SPARK-19738:
--------------------------------------
Summary: Consider adding error handler to DataStreamWriter
Key: SPARK-19738
URL: https://issues.apache.org/jira/browse/SPARK-19738
Project: Spark
Issue Type: Improvement
Components: Structured Streaming
Affects Versions: 2.1.0
Reporter: Jayesh lalwani
For Structured streaming implementations, it is important that the applications
stay always On. However, right now, errors stop the driver. In some cases, this
is not desirable behavior. For example, I have the following application
{code}
import org.apache.spark.sql.types._
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF =
spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
csvDF.writeStream.format("console").start()
{code}
I send the following input to it
{quote}
1,Iron man
2,SUperman
{quote}
Obviously, the data is bad. This causes the executor to throw an exception that
propogates to the driver, which promptly shuts down. The driver is running in
supervised mode, and it gets restarted. The application reads the same bad
input and shuts down again. This goes ad-infinitum. This behavior is desirable,
in cases, the error is recoverable. For example, if the executor cannot talk to
the database, we want the application to keep trying the same input again and
again till the database recovers. However, for some cases, this behavior is
undesirable. We do not want this to happen when the input is bad. We want to
put the bad record in some sort of dead letter queue. Or maybe we want to kill
the driver only when the number of errors have crossed a certain threshold. Or
maybe we want to email someone.
Proposal:
Add a error handler to the data stream. When the executor fails, it should call
the error handler and pass the Exception to the error handler. The error
handler could eat the exception, or transform it, or update counts in an
accumulator, etc
{code}
import org.apache.spark.sql.types._
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF =
spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
csvDF.writeStream.format("console").errorhandler("com.jayesh.ErrorHandler").start()
{code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]