This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 74fc14ef6333 [SPARK-50937][ML][PYTHON][CONNECT] Support `Imputer` on 
Connect
74fc14ef6333 is described below

commit 74fc14ef6333b89c04c83325647c2be906b5d0a1
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Sat Jan 25 17:59:09 2025 +0800

    [SPARK-50937][ML][PYTHON][CONNECT] Support `Imputer` on Connect
    
    ### What changes were proposed in this pull request?
    Support `Imputer` on Connect
    
    ### Why are the changes needed?
    for feature parity
    
    ### Does this PR introduce _any_ user-facing change?
    yes
    
    ### How was this patch tested?
    added test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #49667 from zhengruifeng/ml_connect_imputer.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
    (cherry picked from commit 4af6be6f4ae61f5d32662110d64af45d2234fbbc)
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 .../services/org.apache.spark.ml.Estimator         |  1 +
 .../services/org.apache.spark.ml.Transformer       |  1 +
 .../org/apache/spark/ml/feature/Imputer.scala      |  2 ++
 python/pyspark/ml/feature.py                       |  1 +
 python/pyspark/ml/tests/test_feature.py            | 42 ++++++++++++++++++++++
 .../org/apache/spark/sql/connect/ml/MLUtils.scala  |  1 +
 6 files changed, 48 insertions(+)

diff --git 
a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator 
b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
index 449a07aed31d..1183f50ae7f3 100644
--- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
+++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
@@ -43,6 +43,7 @@ org.apache.spark.ml.recommendation.ALS
 org.apache.spark.ml.fpm.FPGrowth
 
 # feature
+org.apache.spark.ml.feature.Imputer
 org.apache.spark.ml.feature.StandardScaler
 org.apache.spark.ml.feature.MaxAbsScaler
 org.apache.spark.ml.feature.MinMaxScaler
diff --git 
a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer 
b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer
index f13924931e92..74a2c960a98b 100644
--- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer
+++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer
@@ -56,6 +56,7 @@ org.apache.spark.ml.recommendation.ALSModel
 org.apache.spark.ml.fpm.FPGrowthModel
 
 # feature
+org.apache.spark.ml.feature.ImputerModel
 org.apache.spark.ml.feature.StandardScalerModel
 org.apache.spark.ml.feature.MaxAbsScalerModel
 org.apache.spark.ml.feature.MinMaxScalerModel
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala
index ed093c4ba35d..2f51ae2d7fe3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala
@@ -246,6 +246,8 @@ class ImputerModel private[ml] (
 
   import ImputerModel._
 
+  private[ml] def this() = this(Identifiable.randomUID("imputer"), null)
+
   /** @group setParam */
   @Since("3.0.0")
   def setInputCol(value: String): this.type = set(inputCol, value)
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 4c218267749c..4cc45c1bf194 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -2261,6 +2261,7 @@ class ImputerModel(JavaModel, _ImputerParams, 
JavaMLReadable["ImputerModel"], Ja
 
     @property
     @since("2.2.0")
+    @try_remote_attribute_relation
     def surrogateDF(self) -> DataFrame:
         """
         Returns a DataFrame containing inputCols and their corresponding 
surrogates,
diff --git a/python/pyspark/ml/tests/test_feature.py 
b/python/pyspark/ml/tests/test_feature.py
index beb2a80443cc..8c201a01b338 100644
--- a/python/pyspark/ml/tests/test_feature.py
+++ b/python/pyspark/ml/tests/test_feature.py
@@ -34,6 +34,8 @@ from pyspark.ml.feature import (
     HashingTF,
     IDF,
     IDFModel,
+    Imputer,
+    ImputerModel,
     NGram,
     RFormula,
     Tokenizer,
@@ -541,6 +543,46 @@ class FeatureTestsMixin:
             model2 = Word2VecModel.load(d)
             self.assertEqual(str(model), str(model2))
 
+    def test_imputer(self):
+        spark = self.spark
+        df = spark.createDataFrame(
+            [
+                (1.0, float("nan")),
+                (2.0, float("nan")),
+                (float("nan"), 3.0),
+                (4.0, 4.0),
+                (5.0, 5.0),
+            ],
+            ["a", "b"],
+        )
+
+        imputer = Imputer(strategy="mean")
+        imputer.setInputCols(["a", "b"])
+        imputer.setOutputCols(["out_a", "out_b"])
+
+        self.assertEqual(imputer.getStrategy(), "mean")
+        self.assertEqual(imputer.getInputCols(), ["a", "b"])
+        self.assertEqual(imputer.getOutputCols(), ["out_a", "out_b"])
+
+        model = imputer.fit(df)
+        self.assertEqual(model.surrogateDF.columns, ["a", "b"])
+        self.assertEqual(model.surrogateDF.count(), 1)
+        self.assertEqual(list(model.surrogateDF.head()), [3.0, 4.0])
+
+        output = model.transform(df)
+        self.assertEqual(output.columns, ["a", "b", "out_a", "out_b"])
+        self.assertEqual(output.count(), 5)
+
+        # save & load
+        with tempfile.TemporaryDirectory(prefix="imputer") as d:
+            imputer.write().overwrite().save(d)
+            imputer2 = Imputer.load(d)
+            self.assertEqual(str(imputer), str(imputer2))
+
+            model.write().overwrite().save(d)
+            model2 = ImputerModel.load(d)
+            self.assertEqual(str(model), str(model2))
+
     def test_count_vectorizer(self):
         df = self.spark.createDataFrame(
             [(0, ["a", "b", "c"]), (1, ["a", "b", "b", "c", "a"])],
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
index 833c00b1a5c3..cd6e13f33d2b 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
@@ -582,6 +582,7 @@ private[ml] object MLUtils {
     (classOf[FPGrowthModel], Set("associationRules", "freqItemsets")),
 
     // Feature Models
+    (classOf[ImputerModel], Set("surrogateDF")),
     (classOf[StandardScalerModel], Set("mean", "std")),
     (classOf[MaxAbsScalerModel], Set("maxAbs")),
     (classOf[MinMaxScalerModel], Set("originalMax", "originalMin")),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to