This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0e4242d4b2e1 [SPARK-50923][SPARK-50927][ML][PYTHON][CONNECT] Support
FMClassifier and FMRegressor on Connect
0e4242d4b2e1 is described below
commit 0e4242d4b2e19e52c5f7bb2b84ef6c55ee6fa9c5
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Mon Jan 27 17:59:22 2025 +0800
[SPARK-50923][SPARK-50927][ML][PYTHON][CONNECT] Support FMClassifier and
FMRegressor on Connect
### What changes were proposed in this pull request?
Support FMClassifier and FMRegressor on Connect
### Why are the changes needed?
for parity
### Does this PR introduce _any_ user-facing change?
yes
### How was this patch tested?
added tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #49685 from zhengruifeng/ml_connect_fm.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../services/org.apache.spark.ml.Estimator | 2 +
.../services/org.apache.spark.ml.Transformer | 2 +
.../spark/ml/classification/FMClassifier.scala | 3 +
.../apache/spark/ml/regression/FMRegressor.scala | 3 +
python/pyspark/ml/tests/test_classification.py | 102 +++++++++++++++++++++
python/pyspark/ml/tests/test_regression.py | 65 +++++++++++++
.../org/apache/spark/sql/connect/ml/MLUtils.scala | 2 +
7 files changed, 179 insertions(+)
diff --git
a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
index 595355b0c1e4..61338f561868 100644
--- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
+++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator
@@ -22,6 +22,7 @@
org.apache.spark.ml.classification.NaiveBayes
org.apache.spark.ml.classification.LinearSVC
org.apache.spark.ml.classification.LogisticRegression
+org.apache.spark.ml.classification.FMClassifier
org.apache.spark.ml.classification.MultilayerPerceptronClassifier
org.apache.spark.ml.classification.DecisionTreeClassifier
org.apache.spark.ml.classification.RandomForestClassifier
@@ -32,6 +33,7 @@ org.apache.spark.ml.regression.AFTSurvivalRegression
org.apache.spark.ml.regression.IsotonicRegression
org.apache.spark.ml.regression.LinearRegression
org.apache.spark.ml.regression.GeneralizedLinearRegression
+org.apache.spark.ml.regression.FMRegressor
org.apache.spark.ml.regression.DecisionTreeRegressor
org.apache.spark.ml.regression.RandomForestRegressor
org.apache.spark.ml.regression.GBTRegressor
diff --git
a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer
b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer
index 0375bac51d39..04cde68ec806 100644
--- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer
+++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer
@@ -38,6 +38,7 @@ org.apache.spark.ml.feature.HashingTF
org.apache.spark.ml.classification.NaiveBayesModel
org.apache.spark.ml.classification.LinearSVCModel
org.apache.spark.ml.classification.LogisticRegressionModel
+org.apache.spark.ml.classification.FMClassificationModel
org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel
org.apache.spark.ml.classification.DecisionTreeClassificationModel
org.apache.spark.ml.classification.RandomForestClassificationModel
@@ -48,6 +49,7 @@ org.apache.spark.ml.regression.AFTSurvivalRegressionModel
org.apache.spark.ml.regression.IsotonicRegressionModel
org.apache.spark.ml.regression.LinearRegressionModel
org.apache.spark.ml.regression.GeneralizedLinearRegressionModel
+org.apache.spark.ml.regression.FMRegressionModel
org.apache.spark.ml.regression.DecisionTreeRegressionModel
org.apache.spark.ml.regression.RandomForestRegressionModel
org.apache.spark.ml.regression.GBTRegressionModel
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala
b/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala
index 33e7c1fdd5e0..0ef16cb42776 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala
@@ -259,6 +259,9 @@ class FMClassificationModel private[classification] (
with FMClassifierParams with MLWritable
with HasTrainingSummary[FMClassificationTrainingSummary]{
+ private[ml] def this() = this(Identifiable.randomUID("fmc"),
+ Double.NaN, Vectors.empty, Matrices.empty)
+
@Since("3.0.0")
override val numClasses: Int = 2
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala
b/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala
index 182107a443c1..02ef1df2c44e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala
@@ -461,6 +461,9 @@ class FMRegressionModel private[regression] (
extends RegressionModel[Vector, FMRegressionModel]
with FMRegressorParams with MLWritable {
+ private[ml] def this() = this(Identifiable.randomUID("fmr"),
+ Double.NaN, Vectors.empty, Matrices.empty)
+
@Since("3.0.0")
override val numFeatures: Int = linear.size
diff --git a/python/pyspark/ml/tests/test_classification.py
b/python/pyspark/ml/tests/test_classification.py
index bea622db9079..0bcf680933ef 100644
--- a/python/pyspark/ml/tests/test_classification.py
+++ b/python/pyspark/ml/tests/test_classification.py
@@ -34,6 +34,10 @@ from pyspark.ml.classification import (
LogisticRegressionModel,
LogisticRegressionSummary,
BinaryLogisticRegressionSummary,
+ FMClassifier,
+ FMClassificationModel,
+ FMClassificationSummary,
+ FMClassificationTrainingSummary,
DecisionTreeClassifier,
DecisionTreeClassificationModel,
RandomForestClassifier,
@@ -447,6 +451,104 @@ class ClassificationTestsMixin:
model2 = LinearSVCModel.load(d)
self.assertEqual(str(model), str(model2))
+ def test_factorization_machine(self):
+ spark = self.spark
+ df = (
+ spark.createDataFrame(
+ [
+ (1.0, 1.0, Vectors.dense(0.0, 5.0)),
+ (0.0, 2.0, Vectors.dense(1.0, 2.0)),
+ (1.0, 3.0, Vectors.dense(2.0, 1.0)),
+ (0.0, 4.0, Vectors.dense(3.0, 3.0)),
+ ],
+ ["label", "weight", "features"],
+ )
+ .coalesce(1)
+ .sortWithinPartitions("weight")
+ )
+
+ fm = FMClassifier(factorSize=2, maxIter=1, regParam=1.0, seed=1)
+ self.assertEqual(fm.getFactorSize(), 2)
+ self.assertEqual(fm.getMaxIter(), 1)
+ self.assertEqual(fm.getRegParam(), 1.0)
+ self.assertEqual(fm.getSeed(), 1)
+
+ model = fm.fit(df)
+ self.assertEqual(fm.uid, model.uid)
+ self.assertEqual(model.numClasses, 2)
+ self.assertEqual(model.numFeatures, 2)
+ self.assertTrue(
+ np.allclose(model.intercept, 0.9999070647126924, atol=1e-4),
model.intercept
+ )
+ self.assertTrue(
+ np.allclose(
+ model.linear.toArray(), [-0.999999959956255,
0.9999999201744205], atol=1e-4
+ ),
+ model.linear,
+ )
+ self.assertTrue(
+ np.allclose(
+ model.factors.toArray(),
+ [[0.99999918, 0.99999858], [-0.99999943, 0.99999854]],
+ atol=1e-4,
+ ),
+ model.factors,
+ )
+
+ vec = Vectors.dense(0.0, 5.0)
+ pred = model.predict(vec)
+ self.assertEqual(pred, 1.0)
+ pred = model.predictRaw(vec)
+ self.assertTrue(
+ np.allclose(pred.toArray(), [-5.9999066655847955,
5.9999066655847955], atol=1e-4),
+ pred,
+ )
+ pred = model.predictProbability(vec)
+ self.assertTrue(
+ np.allclose(pred.toArray(), [0.002472853377527451,
0.9975271466224725], atol=1e-4),
+ pred,
+ )
+
+ output = model.transform(df)
+ expected_cols = [
+ "label",
+ "weight",
+ "features",
+ "rawPrediction",
+ "probability",
+ "prediction",
+ ]
+ self.assertEqual(output.columns, expected_cols)
+ self.assertEqual(output.count(), 4)
+
+ # model summary
+ self.assertTrue(model.hasSummary)
+ summary = model.summary()
+ self.assertIsInstance(summary, FMClassificationSummary)
+ self.assertIsInstance(summary, FMClassificationTrainingSummary)
+ self.assertEqual(summary.labels, [0.0, 1.0])
+ self.assertEqual(summary.accuracy, 0.25)
+ self.assertEqual(summary.areaUnderROC, 0.5)
+ self.assertEqual(summary.predictions.columns, expected_cols)
+
+ summary2 = model.evaluate(df)
+ self.assertIsInstance(summary2, FMClassificationSummary)
+ self.assertFalse(isinstance(summary2, FMClassificationTrainingSummary))
+ self.assertEqual(summary2.labels, [0.0, 1.0])
+ self.assertEqual(summary2.accuracy, 0.25)
+ self.assertEqual(summary2.areaUnderROC, 0.5)
+ self.assertEqual(summary2.predictions.columns, expected_cols)
+
+ # Model save & load
+ with tempfile.TemporaryDirectory(prefix="factorization_machine") as d:
+ fm.write().overwrite().save(d)
+ fm2 = FMClassifier.load(d)
+ self.assertEqual(str(fm), str(fm2))
+
+ model.write().overwrite().save(d)
+ model2 = FMClassificationModel.load(d)
+ self.assertEqual(str(model), str(model2))
+
def test_decision_tree_classifier(self):
df = (
self.spark.createDataFrame(
diff --git a/python/pyspark/ml/tests/test_regression.py
b/python/pyspark/ml/tests/test_regression.py
index 322a3d70f9e9..f07f75ebeb6c 100644
--- a/python/pyspark/ml/tests/test_regression.py
+++ b/python/pyspark/ml/tests/test_regression.py
@@ -35,6 +35,8 @@ from pyspark.ml.regression import (
GeneralizedLinearRegressionTrainingSummary,
LinearRegressionSummary,
LinearRegressionTrainingSummary,
+ FMRegressor,
+ FMRegressionModel,
DecisionTreeRegressor,
DecisionTreeRegressionModel,
RandomForestRegressor,
@@ -368,6 +370,69 @@ class RegressionTestsMixin:
model2 = GeneralizedLinearRegressionModel.load(d)
self.assertEqual(str(model), str(model2))
+ def test_factorization_machine(self):
+ spark = self.spark
+ df = (
+ spark.createDataFrame(
+ [
+ (1, 1.0, Vectors.dense(0.0, 0.0)),
+ (2, 1.0, Vectors.dense(1.0, 2.0)),
+ (3, 2.0, Vectors.dense(0.0, 0.0)),
+ (4, 2.0, Vectors.dense(1.0, 1.0)),
+ ],
+ ["index", "label", "features"],
+ )
+ .coalesce(1)
+ .sortWithinPartitions("index")
+ .select("label", "features")
+ )
+
+ fm = FMRegressor(factorSize=2, maxIter=1, regParam=1.0, seed=1)
+ self.assertEqual(fm.getFactorSize(), 2)
+ self.assertEqual(fm.getMaxIter(), 1)
+ self.assertEqual(fm.getRegParam(), 1.0)
+ self.assertEqual(fm.getSeed(), 1)
+
+ model = fm.fit(df)
+ self.assertEqual(fm.uid, model.uid)
+ self.assertEqual(model.numFeatures, 2)
+ self.assertTrue(
+ np.allclose(model.intercept, 0.9999999966668874, atol=1e-4),
model.intercept
+ )
+ self.assertTrue(
+ np.allclose(
+ model.linear.toArray(), [0.9999999933342161,
0.9999999950008276], atol=1e-4
+ ),
+ model.linear,
+ )
+ self.assertTrue(
+ np.allclose(
+ model.factors.toArray(),
+ [[-0.99999954, -0.9999992], [0.99999968, -0.99999918]],
+ atol=1e-4,
+ ),
+ model.factors,
+ )
+
+ vec = Vectors.dense(0.0, 5.0)
+ pred = model.predict(vec)
+ self.assertTrue(np.allclose(pred, 5.999999971671025, atol=1e-4), pred)
+
+ output = model.transform(df)
+ expected_cols = ["label", "features", "prediction"]
+ self.assertEqual(output.columns, expected_cols)
+ self.assertEqual(output.count(), 4)
+
+ # Model save & load
+ with tempfile.TemporaryDirectory(prefix="factorization_machine") as d:
+ fm.write().overwrite().save(d)
+ fm2 = FMRegressor.load(d)
+ self.assertEqual(str(fm), str(fm2))
+
+ model.write().overwrite().save(d)
+ model2 = FMRegressionModel.load(d)
+ self.assertEqual(str(model), str(model2))
+
def test_decision_tree_regressor(self):
df = self.df
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
index 38181590484b..56526b7e6737 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala
@@ -530,6 +530,7 @@ private[ml] object MLUtils {
Set("intercept", "coefficients", "interceptVector", "coefficientMatrix",
"evaluate")),
(classOf[LogisticRegressionSummary], Set("probabilityCol", "featuresCol")),
(classOf[BinaryLogisticRegressionSummary], Set("scoreCol")),
+ (classOf[FMClassificationModel], Set("intercept", "linear", "factors",
"evaluate")),
(classOf[MultilayerPerceptronClassificationModel], Set("weights",
"evaluate")),
// Regression Models
@@ -589,6 +590,7 @@ private[ml] object MLUtils {
"tValues",
"pValues")),
(classOf[LinearRegressionTrainingSummary], Set("objectiveHistory",
"totalIterations")),
+ (classOf[FMRegressionModel], Set("intercept", "linear", "factors")),
// Clustering Models
(classOf[KMeansModel], Set("predict", "numFeatures",
"clusterCenterMatrix")),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]