This is an automated email from the ASF dual-hosted git repository.
weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0fa5787d0a6b [SPARK-48883][ML][R] Replace RDD read / write API
invocation with Dataframe read / write API
0fa5787d0a6b is described below
commit 0fa5787d0a6bd17ccd05ff561bc8dfa88af03312
Author: Weichen Xu <[email protected]>
AuthorDate: Fri Jul 12 22:20:37 2024 +0800
[SPARK-48883][ML][R] Replace RDD read / write API invocation with Dataframe
read / write API
### What changes were proposed in this pull request?
Replace RDD read / write API invocation with Dataframe read / write API
### Why are the changes needed?
In databricks runtime, RDD read / write API has some issue for certain
storage types that requires the account key, but Dataframe read / write API
works.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47328 from WeichenXu123/ml-df-writer-save-2.
Authored-by: Weichen Xu <[email protected]>
Signed-off-by: Weichen Xu <[email protected]>
---
.../org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala | 7 +++++--
mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala | 7 +++++--
.../main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala | 7 +++++--
.../org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala | 7 +++++--
.../org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala | 7 +++++--
.../main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala | 7 +++++--
.../src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala | 7 +++++--
mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala | 4 +++-
.../main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala | 7 +++++--
.../main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala | 7 +++++--
.../main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala | 7 +++++--
.../apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala | 7 +++++--
.../scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala | 7 +++++--
mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala | 7 +++++--
mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala | 7 +++++--
.../scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala | 7 +++++--
mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala | 7 +++++--
.../scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala | 7 +++++--
.../apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala | 4 +++-
.../src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala | 7 +++++--
mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala | 3 ++-
.../org/apache/spark/ml/r/RandomForestClassifierWrapper.scala | 8 ++++++--
.../org/apache/spark/ml/r/RandomForestRegressorWrapper.scala | 8 ++++++--
mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala | 6 ++++--
24 files changed, 114 insertions(+), 45 deletions(-)
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
index 7eef3ced422e..67057b3fcef6 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
@@ -129,7 +129,9 @@ private[r] object AFTSurvivalRegressionWrapper extends
MLReadable[AFTSurvivalReg
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -142,7 +144,8 @@ private[r] object AFTSurvivalRegressionWrapper extends
MLReadable[AFTSurvivalReg
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
index 125cdf7259fe..5fc19450d219 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
@@ -94,7 +94,9 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] {
val rMetadata = ("class" -> instance.getClass.getName) ~
("ratingCol" -> instance.ratingCol)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.alsModel.save(modelPath)
}
@@ -107,7 +109,8 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper]
{
val rMetadataPath = new Path(path, "rMetadata").toString
val modelPath = new Path(path, "model").toString
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val ratingCol = (rMetadata \ "ratingCol").extract[String]
val alsModel = ALSModel.load(modelPath)
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
index d4486f1b80a1..c86a788ab330 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
@@ -120,7 +120,9 @@ private[r] object BisectingKMeansWrapper extends
MLReadable[BisectingKMeansWrapp
("size" -> instance.size.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
@@ -133,7 +135,8 @@ private[r] object BisectingKMeansWrapper extends
MLReadable[BisectingKMeansWrapp
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val size = (rMetadata \ "size").extract[Array[Long]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala
index 992a0c18819f..51b8f7ce869c 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala
@@ -131,7 +131,9 @@ private[r] object DecisionTreeClassifierWrapper extends
MLReadable[DecisionTreeC
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
@@ -144,7 +146,8 @@ private[r] object DecisionTreeClassifierWrapper extends
MLReadable[DecisionTreeC
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala
index db421b5a1875..40fb32daf42e 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala
@@ -114,7 +114,9 @@ private[r] object DecisionTreeRegressorWrapper extends
MLReadable[DecisionTreeRe
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
@@ -127,7 +129,8 @@ private[r] object DecisionTreeRegressorWrapper extends
MLReadable[DecisionTreeRe
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala
index 635af0563da0..3ab631b9be0b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala
@@ -151,7 +151,9 @@ private[r] object FMClassifierWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -164,7 +166,8 @@ private[r] object FMClassifierWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala
index b036a1d102d9..e1ec2f47fb03 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala
@@ -132,7 +132,9 @@ private[r] object FMRegressorWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -145,7 +147,8 @@ private[r] object FMRegressorWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala
index b8151d8d9070..ff227e8b7812 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala
@@ -78,7 +78,9 @@ private[r] object FPGrowthWrapper extends
MLReadable[FPGrowthWrapper] {
"class" -> instance.getClass.getName
))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.fpGrowthModel.save(modelPath)
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala
index 777191ef5e5c..5bf021ca3bd4 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala
@@ -138,7 +138,9 @@ private[r] object GBTClassifierWrapper extends
MLReadable[GBTClassifierWrapper]
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
@@ -151,7 +153,8 @@ private[r] object GBTClassifierWrapper extends
MLReadable[GBTClassifierWrapper]
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala
index 6e5ca47fabae..efc7ab21a77f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala
@@ -122,7 +122,9 @@ private[r] object GBTRegressorWrapper extends
MLReadable[GBTRegressorWrapper] {
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
@@ -135,7 +137,8 @@ private[r] object GBTRegressorWrapper extends
MLReadable[GBTRegressorWrapper] {
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
index 9a98a8b18b14..14a090690f8a 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
@@ -113,7 +113,9 @@ private[r] object GaussianMixtureWrapper extends
MLReadable[GaussianMixtureWrapp
("logLikelihood" -> instance.logLikelihood)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
@@ -126,7 +128,8 @@ private[r] object GaussianMixtureWrapper extends
MLReadable[GaussianMixtureWrapp
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val dim = (rMetadata \ "dim").extract[Int]
val logLikelihood = (rMetadata \ "logLikelihood").extract[Double]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
index 60cf0631f91d..fb3a00071362 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
@@ -170,7 +170,9 @@ private[r] object GeneralizedLinearRegressionWrapper
("rAic" -> instance.rAic) ~
("rNumIterations" -> instance.rNumIterations)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -184,7 +186,8 @@ private[r] object GeneralizedLinearRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val rFeatures = (rMetadata \ "rFeatures").extract[Array[String]]
val rCoefficients = (rMetadata \ "rCoefficients").extract[Array[Double]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
index d4a3adea460f..e2df133c1c1e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
@@ -99,7 +99,9 @@ private[r] object IsotonicRegressionWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -112,7 +114,8 @@ private[r] object IsotonicRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
index 78c9a15aac59..4073b69e46b4 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
@@ -123,7 +123,9 @@ private[r] object KMeansWrapper extends
MLReadable[KMeansWrapper] {
("size" -> instance.size.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
@@ -136,7 +138,8 @@ private[r] object KMeansWrapper extends
MLReadable[KMeansWrapper] {
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val size = (rMetadata \ "size").extract[Array[Long]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
index 943c38178d6f..26998edab271 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
@@ -198,7 +198,9 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper]
{
("logPerplexity" -> instance.logPerplexity) ~
("vocabulary" -> instance.vocabulary.toList)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -211,7 +213,8 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper]
{
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val logLikelihood = (rMetadata \ "logLikelihood").extract[Double]
val logPerplexity = (rMetadata \ "logPerplexity").extract[Double]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala
index 96b00fab7e34..3c720ed82b9f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala
@@ -127,7 +127,9 @@ private[r] object LinearRegressionWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -140,7 +142,8 @@ private[r] object LinearRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
index 3645af3e5311..5f77be9e5641 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
@@ -137,7 +137,9 @@ private[r] object LinearSVCWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -150,7 +152,8 @@ private[r] object LinearSVCWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala
index cac3d0609b20..429ca7ba04fc 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala
@@ -192,7 +192,9 @@ private[r] object LogisticRegressionWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -205,7 +207,8 @@ private[r] object LogisticRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
index 96c588acc140..b8e466099af4 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
@@ -142,7 +142,9 @@ private[r] object MultilayerPerceptronClassifierWrapper
val rMetadata = "class" -> instance.getClass.getName
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
index d5e8e0ef4890..30c5ddbe80d7 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
@@ -102,7 +102,9 @@ private[r] object NaiveBayesWrapper extends
MLReadable[NaiveBayesWrapper] {
("labels" -> instance.labels.toImmutableArraySeq) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
@@ -115,7 +117,8 @@ private[r] object NaiveBayesWrapper extends
MLReadable[NaiveBayesWrapper] {
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val labels = (rMetadata \ "labels").extract[Array[String]]
val features = (rMetadata \ "features").extract[Array[String]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
index 551c7514ee85..3a7539e0937f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
@@ -33,7 +33,8 @@ private[r] object RWrappers extends MLReader[Object] {
override def load(path: String): Object = {
implicit val format = DefaultFormats
val rMetadataPath = new Path(path, "rMetadata").toString
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val className = (rMetadata \ "class").extract[String]
className match {
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala
index 7c4175a6c591..b3f040e2e95f 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala
@@ -141,7 +141,10 @@ private[r] object RandomForestClassifierWrapper extends
MLReadable[RandomForestC
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
+
instance.pipeline.save(pipelinePath)
}
}
@@ -154,7 +157,8 @@ private[r] object RandomForestClassifierWrapper extends
MLReadable[RandomForestC
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala
b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala
index 911571cac77d..9c583e2c53bd 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala
@@ -124,7 +124,10 @@ private[r] object RandomForestRegressorWrapper extends
MLReadable[RandomForestRe
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
- sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ sparkSession.createDataFrame(
+ Seq(Tuple1(rMetadataJson))
+ ).repartition(1).write.text(rMetadataPath)
+
instance.pipeline.save(pipelinePath)
}
}
@@ -137,7 +140,8 @@ private[r] object RandomForestRegressorWrapper extends
MLReadable[RandomForestRe
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)
- val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadataStr = sparkSession.read.text(rMetadataPath)
+ .first().getString(0)
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
index 9b26d0a911ac..e4a274ee1483 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
@@ -411,7 +411,8 @@ private[ml] object DefaultParamsWriter {
paramMap: Option[JValue] = None): Unit = {
val metadataPath = new Path(path, "metadata").toString
val metadataJson = getMetadataToSave(instance, sc, extraMetadata, paramMap)
- sc.parallelize(Seq(metadataJson), 1).saveAsTextFile(metadataPath)
+ val spark = SparkSession.getActiveSession.get
+
spark.createDataFrame(Seq(Tuple1(metadataJson))).repartition(1).write.text(metadataPath)
}
/**
@@ -585,7 +586,8 @@ private[ml] object DefaultParamsReader {
*/
def loadMetadata(path: String, sc: SparkContext, expectedClassName: String =
""): Metadata = {
val metadataPath = new Path(path, "metadata").toString
- val metadataStr = sc.textFile(metadataPath, 1).first()
+ val spark = SparkSession.getActiveSession.get
+ val metadataStr = spark.read.text(metadataPath).first().getString(0)
parseMetadata(metadataStr, expectedClassName)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]