[ https://issues.apache.org/jira/browse/SPARK-19681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Boris Clémençon closed SPARK-19681. ------------------------------------ Resolution: Fixed > save and load pipeline and then use it yield java.lang.RuntimeException > ----------------------------------------------------------------------- > > Key: SPARK-19681 > URL: https://issues.apache.org/jira/browse/SPARK-19681 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.1.0 > Reporter: Boris Clémençon > Labels: spark-ml > > Here is the unit test that fails: > import org.apache.spark.SparkConf > import org.apache.spark.ml.Pipeline > import org.apache.spark.ml.classification.LogisticRegression > import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator > import org.apache.spark.ml.feature.{SQLTransformer, VectorAssembler} > import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, > ParamGridBuilder} > import org.apache.spark.sql.{DataFrame, SparkSession} > import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} > import scala.util.Random > /** > * Created by borisclemencon on 21/02/2017. > */ > class PipelineTest extends FlatSpec with Matchers with BeforeAndAfter { > val featuresCol = "features" > val responseCol = "response" > val weightCol = "weight" > val features = Array("X1", "X2") > val lambdas = Array(0.01) > val alpha = 0.2 > val maxIter = 50 > val nfolds = 5 > var spark: SparkSession = _ > before { > val sparkConf: SparkConf = new SparkConf(). > set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"). > set("spark.ui.enabled", "false"). // faster and remove 'spark test > java.net.BindException: Address already in use' warnings! > set("spark.driver.host", "127.0.0.1") > spark = SparkSession. > builder(). > config(sparkConf). > appName("BlendWeightTransformerTest"). > master("local[*]"). > getOrCreate() > } > def makeDataset(n: Int = 100): DataFrame = { > val sc = spark > import sc.implicits._ > val n = 1000 > val data = > for (i <- 1 to n) yield { > val pn = if (Random.nextDouble() < 0.1) "a" else "b" > val x1: Double = Random.nextGaussian() * 5 > val x2: Double = Random.nextGaussian() * 2 > val response: Int = if (Random.nextBoolean()) 1 else 0 > (pn, x1, x2, response) > } > data.toDF(packageNameCol, "X1", "X2", responseCol) > } > "load()" should "produce the same pipeline and result before and after > save()" in { > val lr = new LogisticRegression(). > setFitIntercept(true). > setMaxIter(maxIter). > setElasticNetParam(alpha). > setStandardization(true). > setFamily("binomial"). > setFeaturesCol(featuresCol). > setLabelCol(responseCol) > val assembler = new > VectorAssembler().setInputCols(features).setOutputCol(featuresCol) > val pipeline = new Pipeline().setStages(Array(assembler, lr)) > val evaluator = new BinaryClassificationEvaluator(). > setLabelCol(responseCol). > setMetricName("areaUnderROC") > val paramGrid = new ParamGridBuilder(). > addGrid(lr.regParam, lambdas). > build() > // Train with simple grid cross validation > val cv = new CrossValidator(). > setEstimator(pipeline). > setEvaluator(evaluator). > setEstimatorParamMaps(paramGrid). > setNumFolds(nfolds) // Use 3+ in practice > val df = makeDataset(100).cache > val cvModel = cv.fit(df) > val answer = cvModel.transform(df) > answer.show(truncate = false) > val path = "./PipelineTestcvModel" > cvModel.write.overwrite().save(path) > val cvModelLoaded = CrossValidatorModel.load(path) > val output = cvModelLoaded.transform(df) > output.show(truncate = false) > Compare.assertDataFrameEquals(answer, output) > } > } > yield exception > should produce the same blent pipeline and result before and after save() *** > FAILED *** > [info] java.lang.RuntimeException: no default for type > org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 > [info] at > org.apache.spark.sql.catalyst.expressions.Literal$.default(literals.scala:179) > [info] at > org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:121) > [info] at > org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:114) > [info] at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > [info] at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > [info] at scala.collection.immutable.List.foreach(List.scala:381) > [info] at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > [info] at scala.collection.immutable.List.flatMap(List.scala:344) > [info] at > org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$.unapply(patterns.scala:114) > [info] at > org.apache.spark.sql.execution.SparkStrategies$JoinSelection$.apply(SparkStrategies.scala:158) -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org