This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fa5787d0a6b [SPARK-48883][ML][R] Replace RDD read / write API 
invocation with Dataframe read / write API
0fa5787d0a6b is described below

commit 0fa5787d0a6bd17ccd05ff561bc8dfa88af03312
Author: Weichen Xu <[email protected]>
AuthorDate: Fri Jul 12 22:20:37 2024 +0800

    [SPARK-48883][ML][R] Replace RDD read / write API invocation with Dataframe 
read / write API
    
    ### What changes were proposed in this pull request?
    
    Replace RDD read / write API invocation with Dataframe read / write API
    
    ### Why are the changes needed?
    
    In databricks runtime, RDD read / write API has some issue for certain 
storage types that requires the account key, but Dataframe read / write API 
works.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #47328 from WeichenXu123/ml-df-writer-save-2.
    
    Authored-by: Weichen Xu <[email protected]>
    Signed-off-by: Weichen Xu <[email protected]>
---
 .../org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala      | 7 +++++--
 mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala       | 7 +++++--
 .../main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala | 7 +++++--
 .../org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala     | 7 +++++--
 .../org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala      | 7 +++++--
 .../main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala    | 7 +++++--
 .../src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala | 7 +++++--
 mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala  | 4 +++-
 .../main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala   | 7 +++++--
 .../main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala    | 7 +++++--
 .../main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala | 7 +++++--
 .../apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala    | 7 +++++--
 .../scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala   | 7 +++++--
 mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala    | 7 +++++--
 mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala       | 7 +++++--
 .../scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala     | 7 +++++--
 mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala | 7 +++++--
 .../scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala   | 7 +++++--
 .../apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala | 4 +++-
 .../src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala  | 7 +++++--
 mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala        | 3 ++-
 .../org/apache/spark/ml/r/RandomForestClassifierWrapper.scala     | 8 ++++++--
 .../org/apache/spark/ml/r/RandomForestRegressorWrapper.scala      | 8 ++++++--
 mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala     | 6 ++++--
 24 files changed, 114 insertions(+), 45 deletions(-)

diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
index 7eef3ced422e..67057b3fcef6 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
@@ -129,7 +129,9 @@ private[r] object AFTSurvivalRegressionWrapper extends 
MLReadable[AFTSurvivalReg
       val rMetadata = ("class" -> instance.getClass.getName) ~
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -142,7 +144,8 @@ private[r] object AFTSurvivalRegressionWrapper extends 
MLReadable[AFTSurvivalReg
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
index 125cdf7259fe..5fc19450d219 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
@@ -94,7 +94,9 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] {
       val rMetadata = ("class" -> instance.getClass.getName) ~
         ("ratingCol" -> instance.ratingCol)
       val rMetadataJson: String = compact(render(rMetadata))
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
 
       instance.alsModel.save(modelPath)
     }
@@ -107,7 +109,8 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] 
{
       val rMetadataPath = new Path(path, "rMetadata").toString
       val modelPath = new Path(path, "model").toString
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val ratingCol = (rMetadata \ "ratingCol").extract[String]
       val alsModel = ALSModel.load(modelPath)
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
index d4486f1b80a1..c86a788ab330 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
@@ -120,7 +120,9 @@ private[r] object BisectingKMeansWrapper extends 
MLReadable[BisectingKMeansWrapp
         ("size" -> instance.size.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -133,7 +135,8 @@ private[r] object BisectingKMeansWrapper extends 
MLReadable[BisectingKMeansWrapp
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
       val size = (rMetadata \ "size").extract[Array[Long]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala
index 992a0c18819f..51b8f7ce869c 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala
@@ -131,7 +131,9 @@ private[r] object DecisionTreeClassifierWrapper extends 
MLReadable[DecisionTreeC
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -144,7 +146,8 @@ private[r] object DecisionTreeClassifierWrapper extends 
MLReadable[DecisionTreeC
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val formula = (rMetadata \ "formula").extract[String]
       val features = (rMetadata \ "features").extract[Array[String]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala
index db421b5a1875..40fb32daf42e 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala
@@ -114,7 +114,9 @@ private[r] object DecisionTreeRegressorWrapper extends 
MLReadable[DecisionTreeRe
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -127,7 +129,8 @@ private[r] object DecisionTreeRegressorWrapper extends 
MLReadable[DecisionTreeRe
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val formula = (rMetadata \ "formula").extract[String]
       val features = (rMetadata \ "features").extract[Array[String]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala
index 635af0563da0..3ab631b9be0b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala
@@ -151,7 +151,9 @@ private[r] object FMClassifierWrapper
         ("features" -> instance.features.toImmutableArraySeq) ~
         ("labels" -> instance.labels.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -164,7 +166,8 @@ private[r] object FMClassifierWrapper
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
       val labels = (rMetadata \ "labels").extract[Array[String]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala
index b036a1d102d9..e1ec2f47fb03 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala
@@ -132,7 +132,9 @@ private[r] object FMRegressorWrapper
       val rMetadata = ("class" -> instance.getClass.getName) ~
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -145,7 +147,8 @@ private[r] object FMRegressorWrapper
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala
index b8151d8d9070..ff227e8b7812 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala
@@ -78,7 +78,9 @@ private[r] object FPGrowthWrapper extends 
MLReadable[FPGrowthWrapper] {
         "class" -> instance.getClass.getName
       ))
 
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
 
       instance.fpGrowthModel.save(modelPath)
     }
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala
index 777191ef5e5c..5bf021ca3bd4 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala
@@ -138,7 +138,9 @@ private[r] object GBTClassifierWrapper extends 
MLReadable[GBTClassifierWrapper]
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -151,7 +153,8 @@ private[r] object GBTClassifierWrapper extends 
MLReadable[GBTClassifierWrapper]
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val formula = (rMetadata \ "formula").extract[String]
       val features = (rMetadata \ "features").extract[Array[String]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala
index 6e5ca47fabae..efc7ab21a77f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala
@@ -122,7 +122,9 @@ private[r] object GBTRegressorWrapper extends 
MLReadable[GBTRegressorWrapper] {
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -135,7 +137,8 @@ private[r] object GBTRegressorWrapper extends 
MLReadable[GBTRegressorWrapper] {
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val formula = (rMetadata \ "formula").extract[String]
       val features = (rMetadata \ "features").extract[Array[String]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
index 9a98a8b18b14..14a090690f8a 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
@@ -113,7 +113,9 @@ private[r] object GaussianMixtureWrapper extends 
MLReadable[GaussianMixtureWrapp
         ("logLikelihood" -> instance.logLikelihood)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -126,7 +128,8 @@ private[r] object GaussianMixtureWrapper extends 
MLReadable[GaussianMixtureWrapp
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val dim = (rMetadata \ "dim").extract[Int]
       val logLikelihood = (rMetadata \ "logLikelihood").extract[Double]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
index 60cf0631f91d..fb3a00071362 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
@@ -170,7 +170,9 @@ private[r] object GeneralizedLinearRegressionWrapper
         ("rAic" -> instance.rAic) ~
         ("rNumIterations" -> instance.rNumIterations)
       val rMetadataJson: String = compact(render(rMetadata))
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -184,7 +186,8 @@ private[r] object GeneralizedLinearRegressionWrapper
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val rFeatures = (rMetadata \ "rFeatures").extract[Array[String]]
       val rCoefficients = (rMetadata \ "rCoefficients").extract[Array[Double]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
index d4a3adea460f..e2df133c1c1e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
@@ -99,7 +99,9 @@ private[r] object IsotonicRegressionWrapper
       val rMetadata = ("class" -> instance.getClass.getName) ~
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -112,7 +114,8 @@ private[r] object IsotonicRegressionWrapper
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
index 78c9a15aac59..4073b69e46b4 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
@@ -123,7 +123,9 @@ private[r] object KMeansWrapper extends 
MLReadable[KMeansWrapper] {
         ("size" -> instance.size.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -136,7 +138,8 @@ private[r] object KMeansWrapper extends 
MLReadable[KMeansWrapper] {
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
       val size = (rMetadata \ "size").extract[Array[Long]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
index 943c38178d6f..26998edab271 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
@@ -198,7 +198,9 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper] 
{
         ("logPerplexity" -> instance.logPerplexity) ~
         ("vocabulary" -> instance.vocabulary.toList)
       val rMetadataJson: String = compact(render(rMetadata))
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -211,7 +213,8 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper] 
{
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val logLikelihood = (rMetadata \ "logLikelihood").extract[Double]
       val logPerplexity = (rMetadata \ "logPerplexity").extract[Double]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala
index 96b00fab7e34..3c720ed82b9f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala
@@ -127,7 +127,9 @@ private[r] object LinearRegressionWrapper
       val rMetadata = ("class" -> instance.getClass.getName) ~
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -140,7 +142,8 @@ private[r] object LinearRegressionWrapper
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
index 3645af3e5311..5f77be9e5641 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
@@ -137,7 +137,9 @@ private[r] object LinearSVCWrapper
         ("features" -> instance.features.toImmutableArraySeq) ~
         ("labels" -> instance.labels.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -150,7 +152,8 @@ private[r] object LinearSVCWrapper
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
       val labels = (rMetadata \ "labels").extract[Array[String]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala
index cac3d0609b20..429ca7ba04fc 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala
@@ -192,7 +192,9 @@ private[r] object LogisticRegressionWrapper
         ("features" -> instance.features.toImmutableArraySeq) ~
         ("labels" -> instance.labels.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -205,7 +207,8 @@ private[r] object LogisticRegressionWrapper
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
       val labels = (rMetadata \ "labels").extract[Array[String]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
index 96c588acc140..b8e466099af4 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
@@ -142,7 +142,9 @@ private[r] object MultilayerPerceptronClassifierWrapper
 
       val rMetadata = "class" -> instance.getClass.getName
       val rMetadataJson: String = compact(render(rMetadata))
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
index d5e8e0ef4890..30c5ddbe80d7 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
@@ -102,7 +102,9 @@ private[r] object NaiveBayesWrapper extends 
MLReadable[NaiveBayesWrapper] {
         ("labels" -> instance.labels.toImmutableArraySeq) ~
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -115,7 +117,8 @@ private[r] object NaiveBayesWrapper extends 
MLReadable[NaiveBayesWrapper] {
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val labels = (rMetadata \ "labels").extract[Array[String]]
       val features = (rMetadata \ "features").extract[Array[String]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
index 551c7514ee85..3a7539e0937f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
@@ -33,7 +33,8 @@ private[r] object RWrappers extends MLReader[Object] {
   override def load(path: String): Object = {
     implicit val format = DefaultFormats
     val rMetadataPath = new Path(path, "rMetadata").toString
-    val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+    val rMetadataStr = sparkSession.read.text(rMetadataPath)
+      .first().getString(0)
     val rMetadata = parse(rMetadataStr)
     val className = (rMetadata \ "class").extract[String]
     className match {
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala
index 7c4175a6c591..b3f040e2e95f 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala
@@ -141,7 +141,10 @@ private[r] object RandomForestClassifierWrapper extends 
MLReadable[RandomForestC
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
+
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -154,7 +157,8 @@ private[r] object RandomForestClassifierWrapper extends 
MLReadable[RandomForestC
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val formula = (rMetadata \ "formula").extract[String]
       val features = (rMetadata \ "features").extract[Array[String]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala
index 911571cac77d..9c583e2c53bd 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala
@@ -124,7 +124,10 @@ private[r] object RandomForestRegressorWrapper extends 
MLReadable[RandomForestRe
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+      sparkSession.createDataFrame(
+        Seq(Tuple1(rMetadataJson))
+      ).repartition(1).write.text(rMetadataPath)
+
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -137,7 +140,8 @@ private[r] object RandomForestRegressorWrapper extends 
MLReadable[RandomForestRe
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadataStr = sparkSession.read.text(rMetadataPath)
+        .first().getString(0)
       val rMetadata = parse(rMetadataStr)
       val formula = (rMetadata \ "formula").extract[String]
       val features = (rMetadata \ "features").extract[Array[String]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala 
b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
index 9b26d0a911ac..e4a274ee1483 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
@@ -411,7 +411,8 @@ private[ml] object DefaultParamsWriter {
       paramMap: Option[JValue] = None): Unit = {
     val metadataPath = new Path(path, "metadata").toString
     val metadataJson = getMetadataToSave(instance, sc, extraMetadata, paramMap)
-    sc.parallelize(Seq(metadataJson), 1).saveAsTextFile(metadataPath)
+    val spark = SparkSession.getActiveSession.get
+    
spark.createDataFrame(Seq(Tuple1(metadataJson))).repartition(1).write.text(metadataPath)
   }
 
   /**
@@ -585,7 +586,8 @@ private[ml] object DefaultParamsReader {
    */
   def loadMetadata(path: String, sc: SparkContext, expectedClassName: String = 
""): Metadata = {
     val metadataPath = new Path(path, "metadata").toString
-    val metadataStr = sc.textFile(metadataPath, 1).first()
+    val spark = SparkSession.getActiveSession.get
+    val metadataStr = spark.read.text(metadataPath).first().getString(0)
     parseMetadata(metadataStr, expectedClassName)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to