[
https://issues.apache.org/jira/browse/SPARK-21569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106530#comment-16106530
]
Ryan Williams commented on SPARK-21569:
---------------------------------------
Main.scala:
{code}
package com.foo
import java.nio.file.Files
import com.esotericsoftware.kryo.Kryo
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.serializer.{ KryoRegistrator, KryoSerializer }
import org.apache.spark.{ SparkConf, SparkContext }
object Main {
def main(args: Array[String]): Unit = {
val conf =
new SparkConf()
.set("spark.serializer", classOf[KryoSerializer].getCanonicalName)
.set("spark.kryo.registrationRequired", "true")
.set("spark.kryo.registrator", classOf[Registrar].getCanonicalName)
val sc = new SparkContext(conf)
val dir = Files.createTempDirectory("test")
val path = dir.resolve("foo")
sc
.parallelize(1 to 100)
.map(x ⇒ x.toString → x.toString)
.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](path.toString)
val hpath = new Path(dir.toUri)
hpath.getFileSystem(sc.hadoopConfiguration).delete(hpath, true)
sc.stop()
}
}
class Registrar extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[Range])
}
}
{code}
build.sbt:
{code}
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0" % "provided"
artifactName := ((_, _, _) ⇒ "save-hadoop-file.jar")
crossTarget in packageBin := baseDirectory.value
{code}
from shell:
{code}
sbt package
# Passes in Spark 2.1.1, fails in Spark 2.2.0
spark-submit save-hadoop-file.jar
{code}
> Internal Spark class needs to be kryo-registered
> ------------------------------------------------
>
> Key: SPARK-21569
> URL: https://issues.apache.org/jira/browse/SPARK-21569
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.2.0
> Reporter: Ryan Williams
>
> [Full repro here|https://github.com/ryan-williams/spark-bugs/tree/hf]
> As of 2.2.0, {{saveAsNewAPIHadoopFile}} jobs fail (when
> {{spark.kryo.registrationRequired=true}}) with:
> {code}
> java.lang.IllegalArgumentException: Class is not registered:
> org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage
> Note: To register this class use:
> kryo.register(org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage.class);
> at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:458)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:593)
> at
> org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:315)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:383)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> This internal Spark class should be kryo-registered by Spark by default.
> This was not a problem in 2.1.1.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]