This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new cb3c50d5ed96 [SPARK-50877][ML][PYTHON][CONNECT] Support KMeans &
BisectingKMeans on Connect
cb3c50d5ed96 is described below
commit cb3c50d5ed96f04882e70e4efc2ab46fb0f7c513
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Mon Jan 20 17:30:11 2025 +0800
[SPARK-50877][ML][PYTHON][CONNECT] Support KMeans & BisectingKMeans on
Connect
### What changes were proposed in this pull request?
Support KMeans & BisectingKMeans on Connect
### Why are the changes needed?
For feature parity
### Does this PR introduce _any_ user-facing change?
yes, new algorithms supported
### How was this patch tested?
added tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #49567 from zhengruifeng/ml_connect_kmeans.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
(cherry picked from commit 4443078ab8faecf81e5443e464ddd0092572636f)
Signed-off-by: Ruifeng Zheng <[email protected]>
---
dev/sparktestsupport/modules.py | 2 +
.../services/org.apache.spark.ml.Estimator | 5 +
.../spark/ml/clustering/ClusteringSummary.scala | 3 +-
python/pyspark/ml/clustering.py | 3 +
.../ml/tests/connect/test_parity_clustering.py | 49 ++++++
python/pyspark/ml/tests/test_clustering.py | 191 +++++++++++++++++++++
.../org/apache/spark/sql/connect/ml/MLUtils.scala | 8 +-
7 files changed, 259 insertions(+), 2 deletions(-)
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 045328a05d8e..f34a33dd4b69 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -697,6 +697,7 @@ pyspark_ml = Module(
"pyspark.ml.tests.connect.test_legacy_mode_tuning",
"pyspark.ml.tests.test_classification",
"pyspark.ml.tests.test_regression",
+ "pyspark.ml.tests.test_clustering",
],
excluded_python_implementations=[
"PyPy" # Skip these tests under PyPy since they require numpy and it
isn't available there
@@ -1119,6 +1120,7 @@ pyspark_ml_connect = Module(
"pyspark.ml.tests.connect.test_connect_tuning",
"pyspark.ml.tests.connect.test_parity_classification",
"pyspark.ml.tests.connect.test_parity_regression",
+ "pyspark.ml.tests.connect.test_parity_clustering",
"pyspark.ml.tests.connect.test_parity_evaluation",
],
excluded_python_implementations=[
diff --git
a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
index b4b49ef09bbb..37b9c7e6aeb8 100644
--- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
+++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
@@ -30,3 +30,8 @@ org.apache.spark.ml.regression.LinearRegression
org.apache.spark.ml.regression.DecisionTreeRegressor
org.apache.spark.ml.regression.RandomForestRegressor
org.apache.spark.ml.regression.GBTRegressor
+
+
+# clustering
+org.apache.spark.ml.clustering.KMeans
+org.apache.spark.ml.clustering.BisectingKMeans
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/clustering/ClusteringSummary.scala
b/mllib/src/main/scala/org/apache/spark/ml/clustering/ClusteringSummary.scala
index c90798d8e1f6..6b497fad20be 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/clustering/ClusteringSummary.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/clustering/ClusteringSummary.scala
@@ -18,6 +18,7 @@
package org.apache.spark.ml.clustering
import org.apache.spark.annotation.Since
+import org.apache.spark.ml.util.Summary
import org.apache.spark.sql.{DataFrame, Row}
/**
@@ -34,7 +35,7 @@ class ClusteringSummary private[clustering] (
val predictionCol: String,
val featuresCol: String,
val k: Int,
- @Since("2.4.0") val numIter: Int) extends Serializable {
+ @Since("2.4.0") val numIter: Int) extends Summary with Serializable {
/**
* Cluster centers of the transformed data.
diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py
index 952c994c62ca..8a518dac380c 100644
--- a/python/pyspark/ml/clustering.py
+++ b/python/pyspark/ml/clustering.py
@@ -44,6 +44,7 @@ from pyspark.ml.util import (
JavaMLReadable,
GeneralJavaMLWritable,
HasTrainingSummary,
+ try_remote_attribute_relation,
)
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams,
JavaWrapper
from pyspark.ml.common import inherit_doc, _java2py
@@ -91,6 +92,7 @@ class ClusteringSummary(JavaWrapper):
@property
@since("2.1.0")
+ @try_remote_attribute_relation
def predictions(self) -> DataFrame:
"""
DataFrame produced by the model's `transform` method.
@@ -115,6 +117,7 @@ class ClusteringSummary(JavaWrapper):
@property
@since("2.1.0")
+ @try_remote_attribute_relation
def cluster(self) -> DataFrame:
"""
DataFrame of predicted cluster centers for each training data point.
diff --git a/python/pyspark/ml/tests/connect/test_parity_clustering.py
b/python/pyspark/ml/tests/connect/test_parity_clustering.py
new file mode 100644
index 000000000000..0297ce11c3c1
--- /dev/null
+++ b/python/pyspark/ml/tests/connect/test_parity_clustering.py
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import unittest
+
+from pyspark.ml.tests.test_clustering import ClusteringTestsMixin
+from pyspark.sql import SparkSession
+
+
+class ClusteringParityTests(ClusteringTestsMixin, unittest.TestCase):
+ def setUp(self) -> None:
+ self.spark = SparkSession.builder.remote(
+ os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[2]")
+ ).getOrCreate()
+
+ def test_assert_remote_mode(self):
+ from pyspark.sql import is_remote
+
+ self.assertTrue(is_remote())
+
+ def tearDown(self) -> None:
+ self.spark.stop()
+
+
+if __name__ == "__main__":
+ from pyspark.ml.tests.connect.test_parity_clustering import * # noqa: F401
+
+ try:
+ import xmlrunner # type: ignore[import]
+
+ testRunner = xmlrunner.XMLTestRunner(output="target/test-reports",
verbosity=2)
+ except ImportError:
+ testRunner = None
+ unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/ml/tests/test_clustering.py
b/python/pyspark/ml/tests/test_clustering.py
new file mode 100644
index 000000000000..44c809c061b8
--- /dev/null
+++ b/python/pyspark/ml/tests/test_clustering.py
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import tempfile
+import unittest
+
+import numpy as np
+
+from pyspark.ml.linalg import Vectors
+from pyspark.sql import SparkSession
+from pyspark.ml.clustering import (
+ KMeans,
+ KMeansModel,
+ KMeansSummary,
+ BisectingKMeans,
+ BisectingKMeansModel,
+ BisectingKMeansSummary,
+)
+
+
+class ClusteringTestsMixin:
+ @property
+ def df(self):
+ return (
+ self.spark.createDataFrame(
+ [
+ (1, 1.0, Vectors.dense([-0.1, -0.05])),
+ (2, 2.0, Vectors.dense([-0.01, -0.1])),
+ (3, 3.0, Vectors.dense([0.9, 0.8])),
+ (4, 1.0, Vectors.dense([0.75, 0.935])),
+ (5, 1.0, Vectors.dense([-0.83, -0.68])),
+ (6, 1.0, Vectors.dense([-0.91, -0.76])),
+ ],
+ ["index", "weight", "features"],
+ )
+ .coalesce(1)
+ .sortWithinPartitions("index")
+ )
+
+ def test_kmeans(self):
+ df = self.df.select("weight", "features")
+
+ km = KMeans(
+ k=2,
+ maxIter=2,
+ weightCol="weight",
+ )
+ self.assertEqual(km.getK(), 2)
+ self.assertEqual(km.getMaxIter(), 2)
+ self.assertEqual(km.getWeightCol(), "weight")
+
+ # Estimator save & load
+ with tempfile.TemporaryDirectory(prefix="kmeans") as d:
+ km.write().overwrite().save(d)
+ km2 = KMeans.load(d)
+ self.assertEqual(str(km), str(km2))
+
+ model = km.fit(df)
+ # TODO: support KMeansModel.numFeatures in Python
+ # self.assertEqual(model.numFeatures, 2)
+
+ output = model.transform(df)
+ expected_cols = [
+ "weight",
+ "features",
+ "prediction",
+ ]
+ self.assertEqual(output.columns, expected_cols)
+ self.assertEqual(output.count(), 6)
+
+ self.assertTrue(np.allclose(model.predict(Vectors.dense(0.0, 5.0)), 1,
atol=1e-4))
+
+ # Model summary
+ self.assertTrue(model.hasSummary)
+ summary = model.summary
+ self.assertTrue(isinstance(summary, KMeansSummary))
+ self.assertEqual(summary.k, 2)
+ self.assertEqual(summary.numIter, 2)
+ self.assertEqual(summary.clusterSizes, [4, 2])
+ self.assertTrue(np.allclose(summary.trainingCost, 1.35710375,
atol=1e-4))
+
+ self.assertEqual(summary.featuresCol, "features")
+ self.assertEqual(summary.predictionCol, "prediction")
+
+ self.assertEqual(summary.cluster.columns, ["prediction"])
+ self.assertEqual(summary.cluster.count(), 6)
+
+ self.assertEqual(summary.predictions.columns, expected_cols)
+ self.assertEqual(summary.predictions.count(), 6)
+
+ # Model save & load
+ with tempfile.TemporaryDirectory(prefix="kmeans_model") as d:
+ model.write().overwrite().save(d)
+ model2 = KMeansModel.load(d)
+ self.assertEqual(str(model), str(model2))
+
+ def test_bisecting_kmeans(self):
+ df = self.df.select("weight", "features")
+
+ bkm = BisectingKMeans(
+ k=2,
+ maxIter=2,
+ minDivisibleClusterSize=1.0,
+ weightCol="weight",
+ )
+ self.assertEqual(bkm.getK(), 2)
+ self.assertEqual(bkm.getMaxIter(), 2)
+ self.assertEqual(bkm.getMinDivisibleClusterSize(), 1.0)
+ self.assertEqual(bkm.getWeightCol(), "weight")
+
+ # Estimator save & load
+ with tempfile.TemporaryDirectory(prefix="bisecting_kmeans") as d:
+ bkm.write().overwrite().save(d)
+ bkm2 = BisectingKMeans.load(d)
+ self.assertEqual(str(bkm), str(bkm2))
+
+ model = bkm.fit(df)
+ # TODO: support KMeansModel.numFeatures in Python
+ # self.assertEqual(model.numFeatures, 2)
+
+ output = model.transform(df)
+ expected_cols = [
+ "weight",
+ "features",
+ "prediction",
+ ]
+ self.assertEqual(output.columns, expected_cols)
+ self.assertEqual(output.count(), 6)
+
+ self.assertTrue(np.allclose(model.predict(Vectors.dense(0.0, 5.0)), 1,
atol=1e-4))
+
+ # BisectingKMeans-specific method: computeCost
+ self.assertTrue(np.allclose(model.computeCost(df), 1.164325125,
atol=1e-4))
+
+ # Model summary
+ self.assertTrue(model.hasSummary)
+ summary = model.summary
+ self.assertTrue(isinstance(summary, BisectingKMeansSummary))
+ self.assertEqual(summary.k, 2)
+ self.assertEqual(summary.numIter, 2)
+ self.assertEqual(summary.clusterSizes, [4, 2])
+ self.assertTrue(np.allclose(summary.trainingCost, 1.3571037499999998,
atol=1e-4))
+
+ self.assertEqual(summary.featuresCol, "features")
+ self.assertEqual(summary.predictionCol, "prediction")
+
+ self.assertEqual(summary.cluster.columns, ["prediction"])
+ self.assertEqual(summary.cluster.count(), 6)
+
+ self.assertEqual(summary.predictions.columns, expected_cols)
+ self.assertEqual(summary.predictions.count(), 6)
+
+ # Model save & load
+ with tempfile.TemporaryDirectory(prefix="bisecting_kmeans_model") as d:
+ model.write().overwrite().save(d)
+ model2 = BisectingKMeansModel.load(d)
+ self.assertEqual(str(model), str(model2))
+
+
+class ClusteringTests(ClusteringTestsMixin, unittest.TestCase):
+ def setUp(self) -> None:
+ self.spark = SparkSession.builder.master("local[4]").getOrCreate()
+
+ def tearDown(self) -> None:
+ self.spark.stop()
+
+
+if __name__ == "__main__":
+ from pyspark.ml.tests.test_clustering import * # noqa: F401,F403
+
+ try:
+ import xmlrunner # type: ignore[import]
+
+ testRunner = xmlrunner.XMLTestRunner(output="target/test-reports",
verbosity=2)
+ except ImportError:
+ testRunner = None
+ unittest.main(testRunner=testRunner, verbosity=2)
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
index 145afd90e77e..e220e69a62c5 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
@@ -442,7 +442,13 @@ private[ml] object MLUtils {
"residuals", // LinearRegressionSummary
"rootMeanSquaredError", // LinearRegressionSummary
"tValues", // LinearRegressionSummary
- "totalIterations" // LinearRegressionSummary
+ "totalIterations", // LinearRegressionSummary
+ "k", // KMeansSummary
+ "numIter", // KMeansSummary
+ "clusterSizes", // KMeansSummary
+ "trainingCost", // KMeansSummary
+ "cluster", // KMeansSummary
+ "computeCost" // BisectingKMeansModel
)
def invokeMethodAllowed(obj: Object, methodName: String): Object = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]