[
https://issues.apache.org/jira/browse/LIVY-720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17054645#comment-17054645
]
zhouh2 commented on LIVY-720:
-----------------------------
Hi, [~steviej08]. I've had this problem before. See
[https://github.com/scala/bug/issues/9584] ,This is referring to the problem of
Nil singleton corruption. Indeed, I found serialization problems in our
code.When we pass a closure to livy, it contains scala's List type.
org.apache.livy.shaded.kryo.kryo.Kryo is written by Java, might not consider
the deserialization of scala, so when the deserialization of scala's List, will
create new Nil instance, lead to Nil singleton was damaged. It is recommended
that you find the stack for the method, scala.collection.immutable.Nil$ <init>
,and see where it has been called.
> NoSuchElementException caused when reading from hdfs submitted via livy
> programmatic api
> -----------------------------------------------------------------------------------------
>
> Key: LIVY-720
> URL: https://issues.apache.org/jira/browse/LIVY-720
> Project: Livy
> Issue Type: Bug
> Components: RSC
> Affects Versions: 0.6.0
> Environment: Using a docker container on windows 10:
> https://hub.docker.com/r/cheathwood/hadoop-spark-livy
> Reporter: Stephen Jenkins
> Priority: Blocker
> Attachments: FailingLivySparkJob.zip
>
>
> Hi,
>
> I've been using the Livy programmatic api to submit spark jobs written in
> scala and I've ran into a strange issue. I'm using case classes to wrap the
> parameters I want to send over to spark, then within the job I manipulate
> them to be used for different parts of the job. However, it seems whenever I
> try read and collect data from hdfs I get the following error:
> {code:java}
> java.util.NoSuchElementException: head of empty list
> at scala.collection.immutable.Nil$.head(List.scala:420)
> at scala.collection.immutable.Nil$.head(List.scala:417)
> at scala.collection.immutable.List.map(List.scala:277)
> at
> scala.reflect.internal.Symbols$Symbol.parentSymbols(Symbols.scala:2117)
> at
> scala.reflect.internal.SymbolTable.openPackageModule(SymbolTable.scala:301)
> at
> scala.reflect.internal.SymbolTable.openPackageModule(SymbolTable.scala:341)
> at
> scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply$mcV$sp(SymbolLoaders.scala:74)
> at
> scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply(SymbolLoaders.scala:71)
> at
> scala.reflect.runtime.SymbolLoaders$LazyPackageType$$anonfun$complete$2.apply(SymbolLoaders.scala:71)
> at
> scala.reflect.internal.SymbolTable.slowButSafeEnteringPhaseNotLaterThan(SymbolTable.scala:263)
> at
> scala.reflect.runtime.SymbolLoaders$LazyPackageType.complete(SymbolLoaders.scala:71)
> at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514)
> at
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:174)
> at
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
> at
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
> at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
> at
> scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
> at
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
> at
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:174)
> at
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127)
> at
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1.info(SynchronizedSymbols.scala:174)
> at scala.reflect.internal.Types$TypeRef.thisInfo(Types.scala:2194)
> at scala.reflect.internal.Types$TypeRef.baseClasses(Types.scala:2199)
> at
> scala.reflect.internal.tpe.FindMembers$FindMemberBase.<init>(FindMembers.scala:17)
> at
> scala.reflect.internal.tpe.FindMembers$FindMember.<init>(FindMembers.scala:219)
> at
> scala.reflect.internal.Types$Type.scala$reflect$internal$Types$Type$$findMemberInternal$1(Types.scala:1014)
> at scala.reflect.internal.Types$Type.findMember(Types.scala:1016)
> at scala.reflect.internal.Types$Type.memberBasedOnName(Types.scala:631)
> at scala.reflect.internal.Types$Type.member(Types.scala:600)
> at
> scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
> at
> scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:66)
> at
> scala.reflect.internal.Mirrors$RootsBase.staticPackage(Mirrors.scala:204)
> at
> scala.reflect.runtime.JavaMirrors$JavaMirror.staticPackage(JavaMirrors.scala:82)
> at scala.reflect.internal.Mirrors$RootsBase.init(Mirrors.scala:263)
> at
> scala.reflect.runtime.JavaMirrors$class.scala$reflect$runtime$JavaMirrors$$createMirror(JavaMirrors.scala:32)
> at
> scala.reflect.runtime.JavaMirrors$$anonfun$runtimeMirror$1.apply(JavaMirrors.scala:49)
> at
> scala.reflect.runtime.JavaMirrors$$anonfun$runtimeMirror$1.apply(JavaMirrors.scala:47)
> at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
> at
> scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
> at
> scala.reflect.runtime.JavaMirrors$class.runtimeMirror(JavaMirrors.scala:46)
> at
> scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:16)
> at
> scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:16)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.mirror(ScalaReflection.scala:45)
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:48)
> at org.apache.spark.sql.Encoders$.STRING(Encoders.scala:96)
> at
> org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.createBaseDataset(CSVDataSource.scala:189)
> at
> org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:147)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:63)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:57)
> at
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:202)
> at
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:202)
> at scala.Option.orElse(Option.scala:289)
> at
> org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:201)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:392)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
> at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:594)
> at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:473)
> at test.FailingJob.theStuffToDo(FailingJob.scala:28)
> at test.FailingJob.call(FailingJob.scala:11)
> at
> test.Service$$anonfun$testProvider$2$$anonfun$apply$1.apply(Service.scala:22)
> at
> test.Service$$anonfun$testProvider$2$$anonfun$apply$1.apply(Service.scala:22)
> at
> org.apache.livy.scalaapi.LivyScalaClient$$anon$1.call(LivyScalaClient.scala:54)
> at org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:40)
> at org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:27)
> at org.apache.livy.rsc.driver.JobWrapper.call(JobWrapper.java:64)
> at
> org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:45)
> at
> org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:27)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>
> As you can see, it's a Scala Reflection error and seems to fail when trying
> to get class definitions, which makes me think that it has something to do
> with the kryo serialization/deserialization. I've managed to isolate the
> issue and reproduce it in a small
> [example|[https://github.com/steviej08/FailingLivySparkJob]]
> It doesn't matter what I do with the array within the function it seems, but
> it must be used. As soon the job reads the csv it fails. The key thing is
> `someSeq` is of type `Seq[String]`. It seems to work when I pass it in as an
> `Array[String]`. If I do not reference the param, but still pass the param
> in, it does not fail.
> {code:java}
> val mappedArray = someSeq.map(s => s.toUpperCase())
> val ss: SparkSession = scalaJobContext.sparkSession
> val csv: DataFrame = ss
> .read
> .option("header", "true")
> .option("mode", "DROPMALFORMED")
> .csv(filePath)
> println(csv.count())
> {code}
> I appreciate that the exception is being thrown within spark, but I can't
> seem to reproduce it without using the Rpc Livy programmatic api.
> Other things to note. I am using spark version `2.3`, however, I have tried
> to upgrade spark to `v2.4` with no luck. I have also tried to generate a
> function from an object, without using a case class with no luck. I have
> tried to reference the value as a property on an object, which did work. This
> is no good for me though, as I need (well would much prefer) to pass in a
> parameter.
> I know that I can just use an Array, but it means I'd need to transform all
> my objects for transmission which is a bit of a pain. Also, I am struggling
> to do this transformation in production with it working within my small
> example, but am working on this currently.
>
> Any help or insight on this would be amazing.
>
> Thanks,
> Stephen
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)