[
https://issues.apache.org/jira/browse/SPARK-18737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15783543#comment-15783543
]
Josh Bacon commented on SPARK-18737:
------------------------------------
Hi Sean,
We've perform a more tests and are experiencing the same issues with the
following minimal code reproduction. (Spark 2.0.2 w/ prebuilt hadoop 2.7):
{code:title=Bar.scala|borderStyle=solid}
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kinesis.KinesisUtils
import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
object StreamingFromKinesisTest {
def main(args: Array[String]) {
val endpointUrl = "https://kinesis.us-west-2.amazonaws.com";
val streamName = args(0);
val appName = args(1); //DynamoDB name
val region = "us-west-2";
val sparkSession =
SparkSession.builder.appName("StreamingFromKinesisTest").getOrCreate();
val batchInterval = Seconds(10);
val streamingContext = new StreamingContext(sparkSession.sparkContext,
batchInterval);
val kinesisStreams = (0 until 2).map { _ =>
KinesisUtils.createStream(streamingContext,appName,streamName,endpointUrl,region,InitialPositionInStream.TRIM_HORIZON,batchInterval,StorageLevel.MEMORY_AND_DISK_2);
};
val streamOfArrayBytes = streamingContext.union(kinesisStreams);
val streamStrings = streamOfArrayBytes.map(arrayBytes => new
String(arrayBytes));
streamStrings.foreachRDD((rddString, timestamp) => {
println(timestamp);
if (!rddString.isEmpty()) {
println("Success!");
}
});
streamingContext.start();
streamingContext.awaitTerminationOrTimeout(6000000)
}
}
{code}
{panel:title=Executor Log
Snippet|borderStyle=dashed|borderColor=#ccc|titleBGColor=#F7D6C1|bgColor=#FFFFCE}
16/12/28 11:02:40 INFO BlockManager: Removing RDD 15
16/12/28 11:02:40 INFO BlockManager: Removing RDD 13
16/12/28 11:02:40 INFO BlockManager: Removing RDD 14
16/12/28 11:02:53 INFO CoarseGrainedExecutorBackend: Got assigned task 72
16/12/28 11:02:53 INFO Executor: Running task 0.0 in stage 4.0 (TID 72)
16/12/28 11:02:53 INFO TorrentBroadcast: Started reading broadcast variable 4
16/12/28 11:02:53 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in
memory (estimated size 1762.0 B, free 366.3 MB)
16/12/28 11:02:53 INFO TorrentBroadcast: Reading broadcast variable 4 took 10 ms
16/12/28 11:02:53 INFO MemoryStore: Block broadcast_4 stored as values in
memory (estimated size 2.6 KB, free 366.3 MB)
16/12/28 11:02:53 INFO TransportClientFactory: Successfully created connection
to /172.21.50.111:5000 after 22 ms (21 ms spent in bootstraps)
16/12/28 11:02:54 INFO BlockManager: Found block input-1-1482951722353 remotely
16/12/28 11:02:54 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 72)
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
13994
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324)
at
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/12/28 11:02:55 INFO CoarseGrainedExecutorBackend: Got assigned task 74
16/12/28 11:02:55 INFO Executor: Running task 0.2 in stage 4.0 (TID 74)
16/12/28 11:02:56 INFO BlockManager: Found block input-1-1482951722353 remotely
16/12/28 11:02:56 ERROR Executor: Exception in task 0.2 in stage 4.0 (TID 74)
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
13994
{panel}
The streaming job begins to error out on what appears to be random batches. The
job might work on one submission, then upon stopping driver and resubmitting
job KryoExceptions might begin to appear.
> Serialization setting "spark.serializer" ignored in Spark 2.x
> -------------------------------------------------------------
>
> Key: SPARK-18737
> URL: https://issues.apache.org/jira/browse/SPARK-18737
> Project: Spark
> Issue Type: Bug
> Affects Versions: 2.0.0, 2.0.1
> Reporter: Dr. Michael Menzel
>
> The following exception occurs although the JavaSerializer has been activated:
> 16/11/22 10:49:24 INFO TaskSetManager: Starting task 0.0 in stage 9.0 (TID
> 77, ip-10-121-14-147.eu-central-1.compute.internal, partition 1, RACK_LOCAL,
> 5621 bytes)
> 16/11/22 10:49:24 INFO YarnSchedulerBackend$YarnDriverEndpoint: Launching
> task 77 on executor id: 2 hostname:
> ip-10-121-14-147.eu-central-1.compute.internal.
> 16/11/22 10:49:24 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory
> on ip-10-121-14-147.eu-central-1.compute.internal:45059 (size: 879.0 B, free:
> 410.4 MB)
> 16/11/22 10:49:24 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 77,
> ip-10-121-14-147.eu-central-1.compute.internal):
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
> 13994
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
> at
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
> at
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
> at org.apache.spark.util.NextIterator.to(NextIterator.scala:21)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
> at org.apache.spark.util.NextIterator.toBuffer(NextIterator.scala:21)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
> at org.apache.spark.util.NextIterator.toArray(NextIterator.scala:21)
> at
> org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:927)
> at
> org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:927)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> The code runs perfectly with Spark 1.6.0. Since we moved to 2.0.0 and now
> 2.0.1, we see the Kyro deserialization exception and over time the Spark
> streaming job stops processing since too many tasks failed.
> Our action was to use conf.set("spark.serializer",
> "org.apache.spark.serializer.JavaSerializer") and to disable Kryo class
> registration with conf.set("spark.kryo.registrationRequired", false). We hope
> to identify the root cause of the exception.
> However, setting the serializer to JavaSerializer is oviously ignored by the
> Spark-internals. Despite the setting we still see the exception printed in
> the log and tasks fail. The occurence seems to be non-deterministic, but to
> become more frequent over time.
> Several questions we could not answer during our troubleshooting:
> 1. How can the debug log for Kryo be enabled? -- We tried following the
> minilog documentation, but no output can be found.
> 2. Is the serializer setting effective for Spark internal serializations? How
> can the JavaSerialize be forced on internal serializations for worker to
> driver communication?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]