I would recommend using the Spark Cassandra Connector instead of the Hadoop based writers. The Hadoop code has not had a lot of love in a long time. See
https://github.com/datastax/spark-cassandra-connector On Wed, Apr 3, 2019 at 12:21 PM Brett Marcott <brett.marc...@gmail.com> wrote: > Hi folks, > > I am noticing my spark jobs being stuck when using the > org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter/CqlBulkOutputFormat. > > > It seems that whenever there is a stream failure it may be expected > behavior based on the code to infinite loop. > > Here are one executors logs: > 19/04/03 15:35:06 INFO streaming.StreamResultFuture: [Stream > #59290530-5625-11e9-a2bb-8bc7b49d56b0] Session with /10.82.204.173 is > complete > 19/04/03 15:35:06 WARN streaming.StreamResultFuture: [Stream > #59290530-5625-11e9-a2bb-8bc7b49d56b0] Stream failed > > > On stream failure it seems StreamResultFuture sets the exception for the > AbstractFuture. > AFAIK this should cause the Abstract future to return a new > ExecutionException. > > The problem seems to lie in the fact that the CqlBulkRecordWriter swallows > the Execution exception and continues in a while loop: > > https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274 > < > https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274 > > > > When taking consecutive thread dumps on the same process I see that the > only thread doing work is constantly creating new ExecutionExceptions (the > memory location for ExecutionException was different on each thread dump): > java.lang.Throwable.fillInStackTrace(Native Method) > java.lang.Throwable.fillInStackTrace(Throwable.java:783) => holding > Monitor(java.util.concurrent.ExecutionException@80240763}) > java.lang.Throwable.<init>(Throwable.java:310) > java.lang.Exception.<init>(Exception.java:102) > java.util.concurrent.ExecutionException.<init>(ExecutionException.java:90) > > com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:476) > > com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:357) > > org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:257) > > org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:237) > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1131) > > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1359) > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1131) > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > org.apache.spark.scheduler.Task.run(Task.scala:99) > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:285) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > java.lang.Thread.run(Thread.java:748) > > It seems the logic that lies right below the while loop in linked code > above that checks for failed hosts/streamsessions maybe should have been > within the while loop? > > Thanks, > > Brett