Hi list,

I have Sark 2.2.0 in standalone mode and python 3.6. It is a very small
testing cluster with two nodes.
I am running (trying) a streaming job that simple read from kafka, apply an
ML model and store it back into kafka.
The job is run with following parameters:
"--conf spark.cores.max=2 --conf spark.executor.cores=2 --conf
spark.executor.memory=2g"

The problem I'm facing is that very often the job crash with this exception:

117/08/05 00:19:00 ERROR Utils: Uncaught exception in thread stdout writer
for /opt/spark/miniconda2/envs/pyspark36/bin/python
java.lang.AssertionError: assertion failed: Block rdd_474_0 is not locked
for reading
at scala.Predef$.assert(Predef.scala:170)
at
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:720)
at
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:516)
at
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
at
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
at
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
17/08/05 00:19:00 ERROR SparkUncaughtExceptionHandler: Uncaught exception
in thread Thread[stdout writer for
/opt/spark/miniconda2/envs/pyspark36/bin/python,5,main]
java.lang.AssertionError: assertion failed: Block rdd_474_0 is not locked
for reading
at scala.Predef$.assert(Predef.scala:170)
at
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:720)
at
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:516)
at
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
at
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
at
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)

The stream is created via:
directKafkaStream = KafkaUtils.createDirectStream(ssc,...

The processing:
directKafkaStream.cache().foreachRDD(self._process)

where self._process:

   - puts the RDD into a Dataframe
   - apply a model.transform
   - store it back

Has anyone experienced this?
Any suggestion on how to attak the problem?
I am not sure it is resource constraint as I tried rise cores and memory
with no luck.

Any hint much appreciated,

Reply via email to