I would recommend using the Spark Cassandra Connector instead of the Hadoop
based writers. The Hadoop code has not had a lot of love in a long time. See

https://github.com/datastax/spark-cassandra-connector

On Wed, Apr 3, 2019 at 12:21 PM Brett Marcott <brett.marc...@gmail.com>
wrote:

> Hi folks,
>
> I am noticing my spark jobs being stuck when using the
> org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter/CqlBulkOutputFormat.
>
>
> It seems that whenever there is a stream failure it may be expected
> behavior based on the code to infinite loop.
>
> Here are one executors logs:
> 19/04/03 15:35:06 INFO streaming.StreamResultFuture: [Stream
> #59290530-5625-11e9-a2bb-8bc7b49d56b0] Session with /10.82.204.173 is
> complete
> 19/04/03 15:35:06 WARN streaming.StreamResultFuture: [Stream
> #59290530-5625-11e9-a2bb-8bc7b49d56b0] Stream failed
>
>
> On stream failure it seems StreamResultFuture sets the exception for the
> AbstractFuture.
> AFAIK this should cause the Abstract future to return a new
> ExecutionException.
>
> The problem seems to lie in the fact that the CqlBulkRecordWriter swallows
> the Execution exception and continues in a while loop:
>
> https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274
> <
> https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274
> >
>
> When taking consecutive thread dumps on the same process I see that the
> only thread doing work is constantly creating new ExecutionExceptions (the
> memory location for ExecutionException was different on each thread dump):
> java.lang.Throwable.fillInStackTrace(Native Method)
> java.lang.Throwable.fillInStackTrace(Throwable.java:783) => holding
> Monitor(java.util.concurrent.ExecutionException@80240763})
> java.lang.Throwable.<init>(Throwable.java:310)
> java.lang.Exception.<init>(Exception.java:102)
> java.util.concurrent.ExecutionException.<init>(ExecutionException.java:90)
>
> com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:476)
>
> com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:357)
>
> org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:257)
>
> org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:237)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1131)
>
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1359)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1131)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> org.apache.spark.scheduler.Task.run(Task.scala:99)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:285)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748)
>
> It seems the logic that lies right below the while loop in linked code
> above that checks for failed hosts/streamsessions maybe should have been
> within the while loop?
>
> Thanks,
>
> Brett

Reply via email to