This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cc321373e223 Revert "[SPARK-48883][ML][R] Replace RDD read / write API
invocation with Dataframe read / write API"
cc321373e223 is described below
commit cc321373e223d6c87d3ec58160cbe3911e0fc466
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Sat Jul 13 17:33:59 2024 +0900
Revert "[SPARK-48883][ML][R] Replace RDD read / write API invocation with
Dataframe read / write API"
This reverts commit 0fa5787d0a6bd17ccd05ff561bc8dfa88af03312.
---
.../org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala | 7 ++-----
mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala | 7 ++-----
.../main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala | 7 ++-----
.../org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala | 7 ++-----
.../org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala | 7 ++-----
.../main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala | 7 ++-----
.../src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala | 7 ++-----
mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala | 4 +---
.../main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala | 7 ++-----
.../main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala | 7 ++-----
.../main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala | 7 ++-----
.../apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala | 7 ++-----
.../scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala | 7 ++-----
mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala | 7 ++-----
mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala | 7 ++-----
.../scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala | 7 ++-----
mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala | 7 ++-----
.../scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala | 7 ++-----
.../apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala | 4 +---
.../src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala | 7 ++-----
mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala | 3 +--
.../org/apache/spark/ml/r/RandomForestClassifierWrapper.scala | 8 ++------
.../org/apache/spark/ml/r/RandomForestRegressorWrapper.scala | 8 ++------
mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala | 6 ++----
24 files changed, 45 insertions(+), 114 deletions(-)
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
index 67057b3fcef6..7eef3ced422e 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
@@ -129,9 +129,7 @@ private[r] object AFTSurvivalRegressionWrapper extends
MLReadable[AFTSurvivalReg
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -144,8 +142,7 @@ private[r] object AFTSurvivalRegressionWrapper extends
MLReadable[AFTSurvivalReg
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
index 5fc19450d219..125cdf7259fe 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
@@ -94,9 +94,7 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] {
val rMetadata = ("class" -> instance.getClass.getName) ~
("ratingCol" -> instance.ratingCol)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.alsModel.save(modelPath)
}
@@ -109,8 +107,7 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper]
{
val rMetadataPath = new Path(path, "rMetadata").toString
val modelPath = new Path(path, "model").toString
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val ratingCol = (rMetadata \ "ratingCol").extract[String]
val alsModel = ALSModel.load(modelPath)
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
index c86a788ab330..d4486f1b80a1 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
@@ -120,9 +120,7 @@ private[r] object BisectingKMeansWrapper extends
MLReadable[BisectingKMeansWrapp
("size" -> instance.size.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
@@ -135,8 +133,7 @@ private[r] object BisectingKMeansWrapper extends
MLReadable[BisectingKMeansWrapp
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val size = (rMetadata \ "size").extract[Array[Long]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala
index 51b8f7ce869c..992a0c18819f 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala
@@ -131,9 +131,7 @@ private[r] object DecisionTreeClassifierWrapper extends
MLReadable[DecisionTreeC
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
@@ -146,8 +144,7 @@ private[r] object DecisionTreeClassifierWrapper extends
MLReadable[DecisionTreeC
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala
index 40fb32daf42e..db421b5a1875 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala
@@ -114,9 +114,7 @@ private[r] object DecisionTreeRegressorWrapper extends
MLReadable[DecisionTreeRe
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
@@ -129,8 +127,7 @@ private[r] object DecisionTreeRegressorWrapper extends
MLReadable[DecisionTreeRe
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala
index 3ab631b9be0b..635af0563da0 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala
@@ -151,9 +151,7 @@ private[r] object FMClassifierWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -166,8 +164,7 @@ private[r] object FMClassifierWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala
index e1ec2f47fb03..b036a1d102d9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala
@@ -132,9 +132,7 @@ private[r] object FMRegressorWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -147,8 +145,7 @@ private[r] object FMRegressorWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala
index ff227e8b7812..b8151d8d9070 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala
@@ -78,9 +78,7 @@ private[r] object FPGrowthWrapper extends
MLReadable[FPGrowthWrapper] {
"class" -> instance.getClass.getName
))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.fpGrowthModel.save(modelPath)
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala
index 5bf021ca3bd4..777191ef5e5c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala
@@ -138,9 +138,7 @@ private[r] object GBTClassifierWrapper extends
MLReadable[GBTClassifierWrapper]
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
@@ -153,8 +151,7 @@ private[r] object GBTClassifierWrapper extends
MLReadable[GBTClassifierWrapper]
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala
index efc7ab21a77f..6e5ca47fabae 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala
@@ -122,9 +122,7 @@ private[r] object GBTRegressorWrapper extends
MLReadable[GBTRegressorWrapper] {
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
@@ -137,8 +135,7 @@ private[r] object GBTRegressorWrapper extends
MLReadable[GBTRegressorWrapper] {
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
index 14a090690f8a..9a98a8b18b14 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
@@ -113,9 +113,7 @@ private[r] object GaussianMixtureWrapper extends
MLReadable[GaussianMixtureWrapp
("logLikelihood" -> instance.logLikelihood)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
@@ -128,8 +126,7 @@ private[r] object GaussianMixtureWrapper extends
MLReadable[GaussianMixtureWrapp
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val dim = (rMetadata \ "dim").extract[Int]
val logLikelihood = (rMetadata \ "logLikelihood").extract[Double]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
index fb3a00071362..60cf0631f91d 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
@@ -170,9 +170,7 @@ private[r] object GeneralizedLinearRegressionWrapper
("rAic" -> instance.rAic) ~
("rNumIterations" -> instance.rNumIterations)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -186,8 +184,7 @@ private[r] object GeneralizedLinearRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val rFeatures = (rMetadata \ "rFeatures").extract[Array[String]]
val rCoefficients = (rMetadata \ "rCoefficients").extract[Array[Double]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
index e2df133c1c1e..d4a3adea460f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
@@ -99,9 +99,7 @@ private[r] object IsotonicRegressionWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -114,8 +112,7 @@ private[r] object IsotonicRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
index 4073b69e46b4..78c9a15aac59 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
@@ -123,9 +123,7 @@ private[r] object KMeansWrapper extends
MLReadable[KMeansWrapper] {
("size" -> instance.size.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
@@ -138,8 +136,7 @@ private[r] object KMeansWrapper extends
MLReadable[KMeansWrapper] {
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val size = (rMetadata \ "size").extract[Array[Long]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
index 26998edab271..943c38178d6f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
@@ -198,9 +198,7 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper]
{
("logPerplexity" -> instance.logPerplexity) ~
("vocabulary" -> instance.vocabulary.toList)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -213,8 +211,7 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper]
{
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val logLikelihood = (rMetadata \ "logLikelihood").extract[Double]
val logPerplexity = (rMetadata \ "logPerplexity").extract[Double]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala
index 3c720ed82b9f..96b00fab7e34 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala
@@ -127,9 +127,7 @@ private[r] object LinearRegressionWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -142,8 +140,7 @@ private[r] object LinearRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
index 5f77be9e5641..3645af3e5311 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
@@ -137,9 +137,7 @@ private[r] object LinearSVCWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -152,8 +150,7 @@ private[r] object LinearSVCWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala
index 429ca7ba04fc..cac3d0609b20 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala
@@ -192,9 +192,7 @@ private[r] object LogisticRegressionWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -207,8 +205,7 @@ private[r] object LogisticRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
index b8e466099af4..96c588acc140 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
@@ -142,9 +142,7 @@ private[r] object MultilayerPerceptronClassifierWrapper
val rMetadata = "class" -> instance.getClass.getName
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
index 30c5ddbe80d7..d5e8e0ef4890 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
@@ -102,9 +102,7 @@ private[r] object NaiveBayesWrapper extends
MLReadable[NaiveBayesWrapper] {
("labels" -> instance.labels.toImmutableArraySeq) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -117,8 +115,7 @@ private[r] object NaiveBayesWrapper extends
MLReadable[NaiveBayesWrapper] {
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val labels = (rMetadata \ "labels").extract[Array[String]]
val features = (rMetadata \ "features").extract[Array[String]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
index 3a7539e0937f..551c7514ee85 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
@@ -33,8 +33,7 @@ private[r] object RWrappers extends MLReader[Object] {
override def load(path: String): Object = {
implicit val format = DefaultFormats
val rMetadataPath = new Path(path, "rMetadata").toString
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val className = (rMetadata \ "class").extract[String]
className match {
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala
index b3f040e2e95f..7c4175a6c591 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala
@@ -141,10 +141,7 @@ private[r] object RandomForestClassifierWrapper extends
MLReadable[RandomForestC
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
-
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
@@ -157,8 +154,7 @@ private[r] object RandomForestClassifierWrapper extends
MLReadable[RandomForestC
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala
index 9c583e2c53bd..911571cac77d 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala
@@ -124,10 +124,7 @@ private[r] object RandomForestRegressorWrapper extends
MLReadable[RandomForestRe
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sparkSession.createDataFrame(
- Seq(Tuple1(rMetadataJson))
- ).repartition(1).write.text(rMetadataPath)
-
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
@@ -140,8 +137,7 @@ private[r] object RandomForestRegressorWrapper extends
MLReadable[RandomForestRe
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sparkSession.read.text(rMetadataPath)
- .first().getString(0)
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
index e4a274ee1483..9b26d0a911ac 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
@@ -411,8 +411,7 @@ private[ml] object DefaultParamsWriter {
paramMap: Option[JValue] = None): Unit = {
val metadataPath = new Path(path, "metadata").toString
val metadataJson = getMetadataToSave(instance, sc, extraMetadata, paramMap)
- val spark = SparkSession.getActiveSession.get
-
spark.createDataFrame(Seq(Tuple1(metadataJson))).repartition(1).write.text(metadataPath)
+ sc.parallelize(Seq(metadataJson), 1).saveAsTextFile(metadataPath)
}
/**
@@ -586,8 +585,7 @@ private[ml] object DefaultParamsReader {
*/
def loadMetadata(path: String, sc: SparkContext, expectedClassName: String =
""): Metadata = {
val metadataPath = new Path(path, "metadata").toString
- val spark = SparkSession.getActiveSession.get
- val metadataStr = spark.read.text(metadataPath).first().getString(0)
+ val metadataStr = sc.textFile(metadataPath, 1).first()
parseMetadata(metadataStr, expectedClassName)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]