This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new cb3c50d5ed96 [SPARK-50877][ML][PYTHON][CONNECT] Support KMeans & 
BisectingKMeans on Connect
cb3c50d5ed96 is described below

commit cb3c50d5ed96f04882e70e4efc2ab46fb0f7c513
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Mon Jan 20 17:30:11 2025 +0800

    [SPARK-50877][ML][PYTHON][CONNECT] Support KMeans & BisectingKMeans on 
Connect
    
    ### What changes were proposed in this pull request?
    Support KMeans & BisectingKMeans on Connect
    
    ### Why are the changes needed?
    For feature parity
    
    ### Does this PR introduce _any_ user-facing change?
    yes, new algorithms supported
    
    ### How was this patch tested?
    added tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #49567 from zhengruifeng/ml_connect_kmeans.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
    (cherry picked from commit 4443078ab8faecf81e5443e464ddd0092572636f)
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 dev/sparktestsupport/modules.py                    |   2 +
 .../services/org.apache.spark.ml.Estimator         |   5 +
 .../spark/ml/clustering/ClusteringSummary.scala    |   3 +-
 python/pyspark/ml/clustering.py                    |   3 +
 .../ml/tests/connect/test_parity_clustering.py     |  49 ++++++
 python/pyspark/ml/tests/test_clustering.py         | 191 +++++++++++++++++++++
 .../org/apache/spark/sql/connect/ml/MLUtils.scala  |   8 +-
 7 files changed, 259 insertions(+), 2 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 045328a05d8e..f34a33dd4b69 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -697,6 +697,7 @@ pyspark_ml = Module(
         "pyspark.ml.tests.connect.test_legacy_mode_tuning",
         "pyspark.ml.tests.test_classification",
         "pyspark.ml.tests.test_regression",
+        "pyspark.ml.tests.test_clustering",
     ],
     excluded_python_implementations=[
         "PyPy"  # Skip these tests under PyPy since they require numpy and it 
isn't available there
@@ -1119,6 +1120,7 @@ pyspark_ml_connect = Module(
         "pyspark.ml.tests.connect.test_connect_tuning",
         "pyspark.ml.tests.connect.test_parity_classification",
         "pyspark.ml.tests.connect.test_parity_regression",
+        "pyspark.ml.tests.connect.test_parity_clustering",
         "pyspark.ml.tests.connect.test_parity_evaluation",
     ],
     excluded_python_implementations=[
diff --git 
a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator 
b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
index b4b49ef09bbb..37b9c7e6aeb8 100644
--- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
+++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
@@ -30,3 +30,8 @@ org.apache.spark.ml.regression.LinearRegression
 org.apache.spark.ml.regression.DecisionTreeRegressor
 org.apache.spark.ml.regression.RandomForestRegressor
 org.apache.spark.ml.regression.GBTRegressor
+
+
+# clustering
+org.apache.spark.ml.clustering.KMeans
+org.apache.spark.ml.clustering.BisectingKMeans
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/clustering/ClusteringSummary.scala 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/ClusteringSummary.scala
index c90798d8e1f6..6b497fad20be 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/clustering/ClusteringSummary.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/ClusteringSummary.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.ml.clustering
 
 import org.apache.spark.annotation.Since
+import org.apache.spark.ml.util.Summary
 import org.apache.spark.sql.{DataFrame, Row}
 
 /**
@@ -34,7 +35,7 @@ class ClusteringSummary private[clustering] (
     val predictionCol: String,
     val featuresCol: String,
     val k: Int,
-    @Since("2.4.0") val numIter: Int) extends Serializable {
+    @Since("2.4.0") val numIter: Int) extends Summary with Serializable {
 
   /**
    * Cluster centers of the transformed data.
diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py
index 952c994c62ca..8a518dac380c 100644
--- a/python/pyspark/ml/clustering.py
+++ b/python/pyspark/ml/clustering.py
@@ -44,6 +44,7 @@ from pyspark.ml.util import (
     JavaMLReadable,
     GeneralJavaMLWritable,
     HasTrainingSummary,
+    try_remote_attribute_relation,
 )
 from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, 
JavaWrapper
 from pyspark.ml.common import inherit_doc, _java2py
@@ -91,6 +92,7 @@ class ClusteringSummary(JavaWrapper):
 
     @property
     @since("2.1.0")
+    @try_remote_attribute_relation
     def predictions(self) -> DataFrame:
         """
         DataFrame produced by the model's `transform` method.
@@ -115,6 +117,7 @@ class ClusteringSummary(JavaWrapper):
 
     @property
     @since("2.1.0")
+    @try_remote_attribute_relation
     def cluster(self) -> DataFrame:
         """
         DataFrame of predicted cluster centers for each training data point.
diff --git a/python/pyspark/ml/tests/connect/test_parity_clustering.py 
b/python/pyspark/ml/tests/connect/test_parity_clustering.py
new file mode 100644
index 000000000000..0297ce11c3c1
--- /dev/null
+++ b/python/pyspark/ml/tests/connect/test_parity_clustering.py
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import unittest
+
+from pyspark.ml.tests.test_clustering import ClusteringTestsMixin
+from pyspark.sql import SparkSession
+
+
+class ClusteringParityTests(ClusteringTestsMixin, unittest.TestCase):
+    def setUp(self) -> None:
+        self.spark = SparkSession.builder.remote(
+            os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[2]")
+        ).getOrCreate()
+
+    def test_assert_remote_mode(self):
+        from pyspark.sql import is_remote
+
+        self.assertTrue(is_remote())
+
+    def tearDown(self) -> None:
+        self.spark.stop()
+
+
+if __name__ == "__main__":
+    from pyspark.ml.tests.connect.test_parity_clustering import *  # noqa: F401
+
+    try:
+        import xmlrunner  # type: ignore[import]
+
+        testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", 
verbosity=2)
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/ml/tests/test_clustering.py 
b/python/pyspark/ml/tests/test_clustering.py
new file mode 100644
index 000000000000..44c809c061b8
--- /dev/null
+++ b/python/pyspark/ml/tests/test_clustering.py
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import tempfile
+import unittest
+
+import numpy as np
+
+from pyspark.ml.linalg import Vectors
+from pyspark.sql import SparkSession
+from pyspark.ml.clustering import (
+    KMeans,
+    KMeansModel,
+    KMeansSummary,
+    BisectingKMeans,
+    BisectingKMeansModel,
+    BisectingKMeansSummary,
+)
+
+
+class ClusteringTestsMixin:
+    @property
+    def df(self):
+        return (
+            self.spark.createDataFrame(
+                [
+                    (1, 1.0, Vectors.dense([-0.1, -0.05])),
+                    (2, 2.0, Vectors.dense([-0.01, -0.1])),
+                    (3, 3.0, Vectors.dense([0.9, 0.8])),
+                    (4, 1.0, Vectors.dense([0.75, 0.935])),
+                    (5, 1.0, Vectors.dense([-0.83, -0.68])),
+                    (6, 1.0, Vectors.dense([-0.91, -0.76])),
+                ],
+                ["index", "weight", "features"],
+            )
+            .coalesce(1)
+            .sortWithinPartitions("index")
+        )
+
+    def test_kmeans(self):
+        df = self.df.select("weight", "features")
+
+        km = KMeans(
+            k=2,
+            maxIter=2,
+            weightCol="weight",
+        )
+        self.assertEqual(km.getK(), 2)
+        self.assertEqual(km.getMaxIter(), 2)
+        self.assertEqual(km.getWeightCol(), "weight")
+
+        # Estimator save & load
+        with tempfile.TemporaryDirectory(prefix="kmeans") as d:
+            km.write().overwrite().save(d)
+            km2 = KMeans.load(d)
+            self.assertEqual(str(km), str(km2))
+
+        model = km.fit(df)
+        # TODO: support KMeansModel.numFeatures in Python
+        # self.assertEqual(model.numFeatures, 2)
+
+        output = model.transform(df)
+        expected_cols = [
+            "weight",
+            "features",
+            "prediction",
+        ]
+        self.assertEqual(output.columns, expected_cols)
+        self.assertEqual(output.count(), 6)
+
+        self.assertTrue(np.allclose(model.predict(Vectors.dense(0.0, 5.0)), 1, 
atol=1e-4))
+
+        # Model summary
+        self.assertTrue(model.hasSummary)
+        summary = model.summary
+        self.assertTrue(isinstance(summary, KMeansSummary))
+        self.assertEqual(summary.k, 2)
+        self.assertEqual(summary.numIter, 2)
+        self.assertEqual(summary.clusterSizes, [4, 2])
+        self.assertTrue(np.allclose(summary.trainingCost, 1.35710375, 
atol=1e-4))
+
+        self.assertEqual(summary.featuresCol, "features")
+        self.assertEqual(summary.predictionCol, "prediction")
+
+        self.assertEqual(summary.cluster.columns, ["prediction"])
+        self.assertEqual(summary.cluster.count(), 6)
+
+        self.assertEqual(summary.predictions.columns, expected_cols)
+        self.assertEqual(summary.predictions.count(), 6)
+
+        # Model save & load
+        with tempfile.TemporaryDirectory(prefix="kmeans_model") as d:
+            model.write().overwrite().save(d)
+            model2 = KMeansModel.load(d)
+            self.assertEqual(str(model), str(model2))
+
+    def test_bisecting_kmeans(self):
+        df = self.df.select("weight", "features")
+
+        bkm = BisectingKMeans(
+            k=2,
+            maxIter=2,
+            minDivisibleClusterSize=1.0,
+            weightCol="weight",
+        )
+        self.assertEqual(bkm.getK(), 2)
+        self.assertEqual(bkm.getMaxIter(), 2)
+        self.assertEqual(bkm.getMinDivisibleClusterSize(), 1.0)
+        self.assertEqual(bkm.getWeightCol(), "weight")
+
+        # Estimator save & load
+        with tempfile.TemporaryDirectory(prefix="bisecting_kmeans") as d:
+            bkm.write().overwrite().save(d)
+            bkm2 = BisectingKMeans.load(d)
+            self.assertEqual(str(bkm), str(bkm2))
+
+        model = bkm.fit(df)
+        # TODO: support KMeansModel.numFeatures in Python
+        # self.assertEqual(model.numFeatures, 2)
+
+        output = model.transform(df)
+        expected_cols = [
+            "weight",
+            "features",
+            "prediction",
+        ]
+        self.assertEqual(output.columns, expected_cols)
+        self.assertEqual(output.count(), 6)
+
+        self.assertTrue(np.allclose(model.predict(Vectors.dense(0.0, 5.0)), 1, 
atol=1e-4))
+
+        # BisectingKMeans-specific method: computeCost
+        self.assertTrue(np.allclose(model.computeCost(df), 1.164325125, 
atol=1e-4))
+
+        # Model summary
+        self.assertTrue(model.hasSummary)
+        summary = model.summary
+        self.assertTrue(isinstance(summary, BisectingKMeansSummary))
+        self.assertEqual(summary.k, 2)
+        self.assertEqual(summary.numIter, 2)
+        self.assertEqual(summary.clusterSizes, [4, 2])
+        self.assertTrue(np.allclose(summary.trainingCost, 1.3571037499999998, 
atol=1e-4))
+
+        self.assertEqual(summary.featuresCol, "features")
+        self.assertEqual(summary.predictionCol, "prediction")
+
+        self.assertEqual(summary.cluster.columns, ["prediction"])
+        self.assertEqual(summary.cluster.count(), 6)
+
+        self.assertEqual(summary.predictions.columns, expected_cols)
+        self.assertEqual(summary.predictions.count(), 6)
+
+        # Model save & load
+        with tempfile.TemporaryDirectory(prefix="bisecting_kmeans_model") as d:
+            model.write().overwrite().save(d)
+            model2 = BisectingKMeansModel.load(d)
+            self.assertEqual(str(model), str(model2))
+
+
+class ClusteringTests(ClusteringTestsMixin, unittest.TestCase):
+    def setUp(self) -> None:
+        self.spark = SparkSession.builder.master("local[4]").getOrCreate()
+
+    def tearDown(self) -> None:
+        self.spark.stop()
+
+
+if __name__ == "__main__":
+    from pyspark.ml.tests.test_clustering import *  # noqa: F401,F403
+
+    try:
+        import xmlrunner  # type: ignore[import]
+
+        testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", 
verbosity=2)
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
index 145afd90e77e..e220e69a62c5 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
@@ -442,7 +442,13 @@ private[ml] object MLUtils {
     "residuals", // LinearRegressionSummary
     "rootMeanSquaredError", // LinearRegressionSummary
     "tValues", // LinearRegressionSummary
-    "totalIterations" // LinearRegressionSummary
+    "totalIterations", // LinearRegressionSummary
+    "k", // KMeansSummary
+    "numIter", // KMeansSummary
+    "clusterSizes", // KMeansSummary
+    "trainingCost", // KMeansSummary
+    "cluster", // KMeansSummary
+    "computeCost" // BisectingKMeansModel
   )
 
   def invokeMethodAllowed(obj: Object, methodName: String): Object = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to