[ 
https://issues.apache.org/jira/browse/SPARK-21569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16106530#comment-16106530
 ] 

Ryan Williams commented on SPARK-21569:
---------------------------------------

Main.scala:

{code}
package com.foo

import java.nio.file.Files

import com.esotericsoftware.kryo.Kryo
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.serializer.{ KryoRegistrator, KryoSerializer }
import org.apache.spark.{ SparkConf, SparkContext }

object Main {
  def main(args: Array[String]): Unit = {
    val conf =
      new SparkConf()
        .set("spark.serializer", classOf[KryoSerializer].getCanonicalName)
        .set("spark.kryo.registrationRequired", "true")
        .set("spark.kryo.registrator", classOf[Registrar].getCanonicalName)

    val sc = new SparkContext(conf)

    val dir = Files.createTempDirectory("test")
    val path = dir.resolve("foo")
    sc
      .parallelize(1 to 100)
      .map(x ⇒ x.toString → x.toString)
      .saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](path.toString)

    val hpath = new Path(dir.toUri)
    hpath.getFileSystem(sc.hadoopConfiguration).delete(hpath, true)

    sc.stop()
  }
}

class Registrar extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[Range])
  }
}
{code}

build.sbt:

{code}
scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0" % "provided"

artifactName := ((_, _, _) ⇒ "save-hadoop-file.jar")
crossTarget in packageBin := baseDirectory.value
{code}

from shell:

{code}
sbt package

# Passes in Spark 2.1.1, fails in Spark 2.2.0
spark-submit save-hadoop-file.jar
{code}


> Internal Spark class needs to be kryo-registered
> ------------------------------------------------
>
>                 Key: SPARK-21569
>                 URL: https://issues.apache.org/jira/browse/SPARK-21569
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Ryan Williams
>
> [Full repro here|https://github.com/ryan-williams/spark-bugs/tree/hf]
> As of 2.2.0, {{saveAsNewAPIHadoopFile}} jobs fail (when 
> {{spark.kryo.registrationRequired=true}}) with:
> {code}
> java.lang.IllegalArgumentException: Class is not registered: 
> org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage
> Note: To register this class use: 
> kryo.register(org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage.class);
>       at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:458)
>       at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
>       at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
>       at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:593)
>       at 
> org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:315)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:383)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> This internal Spark class should be kryo-registered by Spark by default.
> This was not a problem in 2.1.1.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to