This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cc321373e223 Revert "[SPARK-48883][ML][R] Replace RDD read / write API 
invocation with Dataframe read / write API"
cc321373e223 is described below

commit cc321373e223d6c87d3ec58160cbe3911e0fc466
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Sat Jul 13 17:33:59 2024 +0900

    Revert "[SPARK-48883][ML][R] Replace RDD read / write API invocation with 
Dataframe read / write API"
    
    This reverts commit 0fa5787d0a6bd17ccd05ff561bc8dfa88af03312.
---
 .../org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala      | 7 ++-----
 mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala       | 7 ++-----
 .../main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala | 7 ++-----
 .../org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala     | 7 ++-----
 .../org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala      | 7 ++-----
 .../main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala    | 7 ++-----
 .../src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala | 7 ++-----
 mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala  | 4 +---
 .../main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala   | 7 ++-----
 .../main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala    | 7 ++-----
 .../main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala | 7 ++-----
 .../apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala    | 7 ++-----
 .../scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala   | 7 ++-----
 mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala    | 7 ++-----
 mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala       | 7 ++-----
 .../scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala     | 7 ++-----
 mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala | 7 ++-----
 .../scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala   | 7 ++-----
 .../apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala | 4 +---
 .../src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala  | 7 ++-----
 mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala        | 3 +--
 .../org/apache/spark/ml/r/RandomForestClassifierWrapper.scala     | 8 ++------
 .../org/apache/spark/ml/r/RandomForestRegressorWrapper.scala      | 8 ++------
 mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala     | 6 ++----
 24 files changed, 45 insertions(+), 114 deletions(-)

diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
index 67057b3fcef6..7eef3ced422e 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
@@ -129,9 +129,7 @@ private[r] object AFTSurvivalRegressionWrapper extends 
MLReadable[AFTSurvivalReg
       val rMetadata = ("class" -> instance.getClass.getName) ~
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -144,8 +142,7 @@ private[r] object AFTSurvivalRegressionWrapper extends 
MLReadable[AFTSurvivalReg
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
index 5fc19450d219..125cdf7259fe 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
@@ -94,9 +94,7 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] {
       val rMetadata = ("class" -> instance.getClass.getName) ~
         ("ratingCol" -> instance.ratingCol)
       val rMetadataJson: String = compact(render(rMetadata))
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
 
       instance.alsModel.save(modelPath)
     }
@@ -109,8 +107,7 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] 
{
       val rMetadataPath = new Path(path, "rMetadata").toString
       val modelPath = new Path(path, "model").toString
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val ratingCol = (rMetadata \ "ratingCol").extract[String]
       val alsModel = ALSModel.load(modelPath)
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
index c86a788ab330..d4486f1b80a1 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala
@@ -120,9 +120,7 @@ private[r] object BisectingKMeansWrapper extends 
MLReadable[BisectingKMeansWrapp
         ("size" -> instance.size.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -135,8 +133,7 @@ private[r] object BisectingKMeansWrapper extends 
MLReadable[BisectingKMeansWrapp
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
       val size = (rMetadata \ "size").extract[Array[Long]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala
index 51b8f7ce869c..992a0c18819f 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala
@@ -131,9 +131,7 @@ private[r] object DecisionTreeClassifierWrapper extends 
MLReadable[DecisionTreeC
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -146,8 +144,7 @@ private[r] object DecisionTreeClassifierWrapper extends 
MLReadable[DecisionTreeC
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val formula = (rMetadata \ "formula").extract[String]
       val features = (rMetadata \ "features").extract[Array[String]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala
index 40fb32daf42e..db421b5a1875 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala
@@ -114,9 +114,7 @@ private[r] object DecisionTreeRegressorWrapper extends 
MLReadable[DecisionTreeRe
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -129,8 +127,7 @@ private[r] object DecisionTreeRegressorWrapper extends 
MLReadable[DecisionTreeRe
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val formula = (rMetadata \ "formula").extract[String]
       val features = (rMetadata \ "features").extract[Array[String]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala
index 3ab631b9be0b..635af0563da0 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala
@@ -151,9 +151,7 @@ private[r] object FMClassifierWrapper
         ("features" -> instance.features.toImmutableArraySeq) ~
         ("labels" -> instance.labels.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -166,8 +164,7 @@ private[r] object FMClassifierWrapper
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
       val labels = (rMetadata \ "labels").extract[Array[String]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala
index e1ec2f47fb03..b036a1d102d9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala
@@ -132,9 +132,7 @@ private[r] object FMRegressorWrapper
       val rMetadata = ("class" -> instance.getClass.getName) ~
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -147,8 +145,7 @@ private[r] object FMRegressorWrapper
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala
index ff227e8b7812..b8151d8d9070 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala
@@ -78,9 +78,7 @@ private[r] object FPGrowthWrapper extends 
MLReadable[FPGrowthWrapper] {
         "class" -> instance.getClass.getName
       ))
 
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
 
       instance.fpGrowthModel.save(modelPath)
     }
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala
index 5bf021ca3bd4..777191ef5e5c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala
@@ -138,9 +138,7 @@ private[r] object GBTClassifierWrapper extends 
MLReadable[GBTClassifierWrapper]
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -153,8 +151,7 @@ private[r] object GBTClassifierWrapper extends 
MLReadable[GBTClassifierWrapper]
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val formula = (rMetadata \ "formula").extract[String]
       val features = (rMetadata \ "features").extract[Array[String]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala
index efc7ab21a77f..6e5ca47fabae 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala
@@ -122,9 +122,7 @@ private[r] object GBTRegressorWrapper extends 
MLReadable[GBTRegressorWrapper] {
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -137,8 +135,7 @@ private[r] object GBTRegressorWrapper extends 
MLReadable[GBTRegressorWrapper] {
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val formula = (rMetadata \ "formula").extract[String]
       val features = (rMetadata \ "features").extract[Array[String]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
index 14a090690f8a..9a98a8b18b14 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
@@ -113,9 +113,7 @@ private[r] object GaussianMixtureWrapper extends 
MLReadable[GaussianMixtureWrapp
         ("logLikelihood" -> instance.logLikelihood)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -128,8 +126,7 @@ private[r] object GaussianMixtureWrapper extends 
MLReadable[GaussianMixtureWrapp
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val dim = (rMetadata \ "dim").extract[Int]
       val logLikelihood = (rMetadata \ "logLikelihood").extract[Double]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
index fb3a00071362..60cf0631f91d 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
@@ -170,9 +170,7 @@ private[r] object GeneralizedLinearRegressionWrapper
         ("rAic" -> instance.rAic) ~
         ("rNumIterations" -> instance.rNumIterations)
       val rMetadataJson: String = compact(render(rMetadata))
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -186,8 +184,7 @@ private[r] object GeneralizedLinearRegressionWrapper
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val rFeatures = (rMetadata \ "rFeatures").extract[Array[String]]
       val rCoefficients = (rMetadata \ "rCoefficients").extract[Array[Double]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
index e2df133c1c1e..d4a3adea460f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
@@ -99,9 +99,7 @@ private[r] object IsotonicRegressionWrapper
       val rMetadata = ("class" -> instance.getClass.getName) ~
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -114,8 +112,7 @@ private[r] object IsotonicRegressionWrapper
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
index 4073b69e46b4..78c9a15aac59 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
@@ -123,9 +123,7 @@ private[r] object KMeansWrapper extends 
MLReadable[KMeansWrapper] {
         ("size" -> instance.size.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -138,8 +136,7 @@ private[r] object KMeansWrapper extends 
MLReadable[KMeansWrapper] {
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
       val size = (rMetadata \ "size").extract[Array[Long]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
index 26998edab271..943c38178d6f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
@@ -198,9 +198,7 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper] 
{
         ("logPerplexity" -> instance.logPerplexity) ~
         ("vocabulary" -> instance.vocabulary.toList)
       val rMetadataJson: String = compact(render(rMetadata))
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -213,8 +211,7 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper] 
{
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val logLikelihood = (rMetadata \ "logLikelihood").extract[Double]
       val logPerplexity = (rMetadata \ "logPerplexity").extract[Double]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala
index 3c720ed82b9f..96b00fab7e34 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala
@@ -127,9 +127,7 @@ private[r] object LinearRegressionWrapper
       val rMetadata = ("class" -> instance.getClass.getName) ~
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -142,8 +140,7 @@ private[r] object LinearRegressionWrapper
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
index 5f77be9e5641..3645af3e5311 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
@@ -137,9 +137,7 @@ private[r] object LinearSVCWrapper
         ("features" -> instance.features.toImmutableArraySeq) ~
         ("labels" -> instance.labels.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -152,8 +150,7 @@ private[r] object LinearSVCWrapper
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
       val labels = (rMetadata \ "labels").extract[Array[String]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala
index 429ca7ba04fc..cac3d0609b20 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala
@@ -192,9 +192,7 @@ private[r] object LogisticRegressionWrapper
         ("features" -> instance.features.toImmutableArraySeq) ~
         ("labels" -> instance.labels.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -207,8 +205,7 @@ private[r] object LogisticRegressionWrapper
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val features = (rMetadata \ "features").extract[Array[String]]
       val labels = (rMetadata \ "labels").extract[Array[String]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
index b8e466099af4..96c588acc140 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
@@ -142,9 +142,7 @@ private[r] object MultilayerPerceptronClassifierWrapper
 
       val rMetadata = "class" -> instance.getClass.getName
       val rMetadataJson: String = compact(render(rMetadata))
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
index 30c5ddbe80d7..d5e8e0ef4890 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
@@ -102,9 +102,7 @@ private[r] object NaiveBayesWrapper extends 
MLReadable[NaiveBayesWrapper] {
         ("labels" -> instance.labels.toImmutableArraySeq) ~
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
 
       instance.pipeline.save(pipelinePath)
     }
@@ -117,8 +115,7 @@ private[r] object NaiveBayesWrapper extends 
MLReadable[NaiveBayesWrapper] {
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val labels = (rMetadata \ "labels").extract[Array[String]]
       val features = (rMetadata \ "features").extract[Array[String]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
index 3a7539e0937f..551c7514ee85 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
@@ -33,8 +33,7 @@ private[r] object RWrappers extends MLReader[Object] {
   override def load(path: String): Object = {
     implicit val format = DefaultFormats
     val rMetadataPath = new Path(path, "rMetadata").toString
-    val rMetadataStr = sparkSession.read.text(rMetadataPath)
-      .first().getString(0)
+    val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
     val rMetadata = parse(rMetadataStr)
     val className = (rMetadata \ "class").extract[String]
     className match {
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala
index b3f040e2e95f..7c4175a6c591 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala
@@ -141,10 +141,7 @@ private[r] object RandomForestClassifierWrapper extends 
MLReadable[RandomForestC
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
-
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -157,8 +154,7 @@ private[r] object RandomForestClassifierWrapper extends 
MLReadable[RandomForestC
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val formula = (rMetadata \ "formula").extract[String]
       val features = (rMetadata \ "features").extract[Array[String]]
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala
index 9c583e2c53bd..911571cac77d 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala
@@ -124,10 +124,7 @@ private[r] object RandomForestRegressorWrapper extends 
MLReadable[RandomForestRe
         ("features" -> instance.features.toImmutableArraySeq)
       val rMetadataJson: String = compact(render(rMetadata))
 
-      sparkSession.createDataFrame(
-        Seq(Tuple1(rMetadataJson))
-      ).repartition(1).write.text(rMetadataPath)
-
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
       instance.pipeline.save(pipelinePath)
     }
   }
@@ -140,8 +137,7 @@ private[r] object RandomForestRegressorWrapper extends 
MLReadable[RandomForestRe
       val pipelinePath = new Path(path, "pipeline").toString
       val pipeline = PipelineModel.load(pipelinePath)
 
-      val rMetadataStr = sparkSession.read.text(rMetadataPath)
-        .first().getString(0)
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
       val rMetadata = parse(rMetadataStr)
       val formula = (rMetadata \ "formula").extract[String]
       val features = (rMetadata \ "features").extract[Array[String]]
diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala 
b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
index e4a274ee1483..9b26d0a911ac 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
@@ -411,8 +411,7 @@ private[ml] object DefaultParamsWriter {
       paramMap: Option[JValue] = None): Unit = {
     val metadataPath = new Path(path, "metadata").toString
     val metadataJson = getMetadataToSave(instance, sc, extraMetadata, paramMap)
-    val spark = SparkSession.getActiveSession.get
-    
spark.createDataFrame(Seq(Tuple1(metadataJson))).repartition(1).write.text(metadataPath)
+    sc.parallelize(Seq(metadataJson), 1).saveAsTextFile(metadataPath)
   }
 
   /**
@@ -586,8 +585,7 @@ private[ml] object DefaultParamsReader {
    */
   def loadMetadata(path: String, sc: SparkContext, expectedClassName: String = 
""): Metadata = {
     val metadataPath = new Path(path, "metadata").toString
-    val spark = SparkSession.getActiveSession.get
-    val metadataStr = spark.read.text(metadataPath).first().getString(0)
+    val metadataStr = sc.textFile(metadataPath, 1).first()
     parseMetadata(metadataStr, expectedClassName)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to