This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2e1a39cc6701 [SPARK-48896][ML][MLLIB] Avoid repartition when writing
out the metadata
2e1a39cc6701 is described below
commit 2e1a39cc67016a790f971a2d7310a08e2c7a0701
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Tue Jul 16 08:56:51 2024 -0700
[SPARK-48896][ML][MLLIB] Avoid repartition when writing out the metadata
### What changes were proposed in this pull request?
This PR proposes to remove `repartition(1)` when writing metadata in
ML/MLlib. It already writes one file.
### Why are the changes needed?
In order to remove unnecessary shuffle, see also
https://github.com/apache/spark/pull/47341
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests should verify them.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47347 from HyukjinKwon/SPARK-48896.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../main/scala/org/apache/spark/ml/classification/FMClassifier.scala | 2 +-
.../src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala | 2 +-
.../scala/org/apache/spark/ml/classification/LogisticRegression.scala | 2 +-
.../spark/ml/classification/MultilayerPerceptronClassifier.scala | 2 +-
.../main/scala/org/apache/spark/ml/classification/NaiveBayes.scala | 2 +-
.../main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala | 2 +-
mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala | 2 +-
.../org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala | 2 +-
mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala | 2 +-
.../src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala | 2 +-
mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala | 2 +-
mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala | 2 +-
mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala | 2 +-
mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala | 2 +-
mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala | 2 +-
mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala | 2 +-
mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala | 4 ++--
mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala | 2 +-
mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala | 2 +-
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala | 2 +-
.../scala/org/apache/spark/ml/feature/UnivariateFeatureSelector.scala | 2 +-
.../scala/org/apache/spark/ml/feature/VarianceThresholdSelector.scala | 2 +-
mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala | 2 +-
.../scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala | 2 +-
mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala | 2 +-
.../org/apache/spark/ml/regression/GeneralizedLinearRegression.scala | 2 +-
.../scala/org/apache/spark/ml/regression/IsotonicRegression.scala | 2 +-
.../main/scala/org/apache/spark/ml/regression/LinearRegression.scala | 2 +-
.../main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala | 4 ++--
.../spark/mllib/classification/impl/GLMClassificationModel.scala | 2 +-
.../org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala | 2 +-
31 files changed, 33 insertions(+), 33 deletions(-)
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala
b/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala
index 4a4a4fffe5de..aec740a932ac 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala
@@ -342,7 +342,7 @@ object FMClassificationModel extends
MLReadable[FMClassificationModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.intercept, instance.linear, instance.factors)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
index 4bcc7877658d..3e27f781d561 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
@@ -449,7 +449,7 @@ object LinearSVCModel extends MLReadable[LinearSVCModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.coefficients, instance.intercept)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index b3c48f13591f..ac0682f1df5b 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -1315,7 +1315,7 @@ object LogisticRegressionModel extends
MLReadable[LogisticRegressionModel] {
val data = Data(instance.numClasses, instance.numFeatures,
instance.interceptVector,
instance.coefficientMatrix, instance.isMultinomial)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
index 0ae1f0e277ad..16984bf9aed8 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
@@ -369,7 +369,7 @@ object MultilayerPerceptronClassificationModel
// Save model data: weights
val data = Data(instance.weights)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
index b7f9f97585fc..52486cb8aa24 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
@@ -591,7 +591,7 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] {
}
val data = Data(instance.pi, instance.theta, instance.sigma)
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
index a68b2fc0dec8..0f6648bb4cda 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
@@ -243,7 +243,7 @@ object GaussianMixtureModel extends
MLReadable[GaussianMixtureModel] {
val sigmas = gaussians.map(c => OldMatrices.fromML(c.cov))
val data = Data(weights, mus, sigmas)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index 3f6bdda9e050..7cbfc732a19c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -659,7 +659,7 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] {
val data = Data(instance.vocabSize, oldModel.topicsMatrix,
oldModel.docConcentration,
oldModel.topicConcentration, oldModel.gammaShape)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala
index 16f72e18b977..d30962088cb8 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala
@@ -230,7 +230,7 @@ object BucketedRandomProjectionLSHModel extends
MLReadable[BucketedRandomProject
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.randMatrix)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala
index 10149a65a954..3062f643e950 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala
@@ -176,7 +176,7 @@ object ChiSqSelectorModel extends
MLReadable[ChiSqSelectorModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.selectedFeatures.toImmutableArraySeq)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
index 890266ed7a72..b81914f86fbb 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
@@ -375,7 +375,7 @@ object CountVectorizerModel extends
MLReadable[CountVectorizerModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.vocabulary.toImmutableArraySeq)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
index e451d4daffbc..696e1516582d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
@@ -201,7 +201,7 @@ object IDFModel extends MLReadable[IDFModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.idf, instance.docFreq, instance.numDocs)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
index 2d48a5f9f491..05ee59d1627d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
@@ -165,7 +165,7 @@ object MaxAbsScalerModel extends
MLReadable[MaxAbsScalerModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = new Data(instance.maxAbs)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala
index cdedcc2de956..d94aadd1ce1f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala
@@ -223,7 +223,7 @@ object MinHashLSHModel extends MLReadable[MinHashLSHModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.randCoefficients.flatMap(tuple =>
Array(tuple._1, tuple._2)))
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
index 22c4ca9cddf4..4111e559a5c2 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
@@ -250,7 +250,7 @@ object MinMaxScalerModel extends
MLReadable[MinMaxScalerModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = new Data(instance.originalMin, instance.originalMax)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
index e32addc7ee19..e7cf0105754a 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
@@ -406,7 +406,7 @@ object OneHotEncoderModel extends
MLReadable[OneHotEncoderModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.categorySizes)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
index 16373a4c4af1..f7ec18b38a0e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
@@ -187,7 +187,7 @@ object PCAModel extends MLReadable[PCAModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.pc, instance.explainedVariance)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
index f3f85b409867..7a47e73e5ef4 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
@@ -506,7 +506,7 @@ private object ColumnPruner extends
MLReadable[ColumnPruner] {
// Save model data: columnsToPrune
val data = Data(instance.columnsToPrune.toSeq)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
@@ -598,7 +598,7 @@ private object VectorAttributeRewriter extends
MLReadable[VectorAttributeRewrite
// Save model data: vectorCol, prefixesToRewrite
val data = Data(instance.vectorCol, instance.prefixesToRewrite)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala
index df6e54ce12d9..0950dc55dccb 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala
@@ -287,7 +287,7 @@ object RobustScalerModel extends
MLReadable[RobustScalerModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.range, instance.median)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
index 92dee46ad005..c0a6392c29c3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
@@ -208,7 +208,7 @@ object StandardScalerModel extends
MLReadable[StandardScalerModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.std, instance.mean)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
index 34f77f029395..2ca640445b55 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
@@ -512,7 +512,7 @@ object StringIndexerModel extends
MLReadable[StringIndexerModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.labelsArray)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/UnivariateFeatureSelector.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/UnivariateFeatureSelector.scala
index 35e5b27183ad..29a091012495 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/feature/UnivariateFeatureSelector.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/feature/UnivariateFeatureSelector.scala
@@ -352,7 +352,7 @@ object UnivariateFeatureSelectorModel extends
MLReadable[UnivariateFeatureSelect
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.selectedFeatures.toImmutableArraySeq)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/VarianceThresholdSelector.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/VarianceThresholdSelector.scala
index 82b49bd80067..df57e19f1a72 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/feature/VarianceThresholdSelector.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/feature/VarianceThresholdSelector.scala
@@ -190,7 +190,7 @@ object VarianceThresholdSelectorModel extends
MLReadable[VarianceThresholdSelect
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.selectedFeatures.toImmutableArraySeq)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
index 8eb8f81227ca..4fed325e19e9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
@@ -522,7 +522,7 @@ object VectorIndexerModel extends
MLReadable[VectorIndexerModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.numFeatures, instance.categoryMaps)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
index 788ad65497df..d77d79dae4b8 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
@@ -498,7 +498,7 @@ object AFTSurvivalRegressionModel extends
MLReadable[AFTSurvivalRegressionModel]
// Save model data: coefficients, intercept, scale
val data = Data(instance.coefficients, instance.intercept,
instance.scale)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala
b/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala
index 6e09143e9ee7..8c797295e671 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala
@@ -507,7 +507,7 @@ object FMRegressionModel extends
MLReadable[FMRegressionModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.intercept, instance.linear, instance.factors)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
index 4ded2f8d7bf5..181a1a03e6f3 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
@@ -1145,7 +1145,7 @@ object GeneralizedLinearRegressionModel extends
MLReadable[GeneralizedLinearRegr
// Save model data: intercept, coefficients
val data = Data(instance.intercept, instance.coefficients)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
index f1f2179ac4b3..29d8a00a4384 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
@@ -306,7 +306,7 @@ object IsotonicRegressionModel extends
MLReadable[IsotonicRegressionModel] {
val data = Data(
instance.oldModel.boundaries, instance.oldModel.predictions,
instance.oldModel.isotonic)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index 23e536ce45eb..d5dce782770b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -784,7 +784,7 @@ private class InternalLinearRegressionModelWriter
// Save model data: intercept, coefficients, scale
val data = Data(instance.intercept, instance.coefficients, instance.scale)
val dataPath = new Path(path, "data").toString
-
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
+ sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
index 3d36b8270861..3bc1d592f989 100644
---
a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
+++
b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -201,7 +201,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
// Create Parquet data.
-
spark.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath(path))
+ spark.createDataFrame(Seq(data)).write.parquet(dataPath(path))
}
@Since("1.3.0")
@@ -246,7 +246,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
// Create Parquet data.
-
spark.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath(path))
+ spark.createDataFrame(Seq(data)).write.parquet(dataPath(path))
}
def load(sc: SparkContext, path: String): NaiveBayesModel = {
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
index 84491181d077..cb18a6003f7f 100644
---
a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
+++
b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
@@ -61,7 +61,7 @@ private[classification] object GLMClassificationModel {
// Create Parquet data.
val data = Data(weights, intercept, threshold)
-
spark.createDataFrame(Seq(data)).repartition(1).write.parquet(Loader.dataPath(path))
+ spark.createDataFrame(Seq(data)).write.parquet(Loader.dataPath(path))
}
/**
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
index cd90e97cc538..bbc513f93b38 100644
---
a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
+++
b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
@@ -57,7 +57,7 @@ private[regression] object GLMRegressionModel {
// Create Parquet data.
val data = Data(weights, intercept)
-
spark.createDataFrame(Seq(data)).repartition(1).write.parquet(Loader.dataPath(path))
+ spark.createDataFrame(Seq(data)).write.parquet(Loader.dataPath(path))
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]