Repository: spark Updated Branches: refs/heads/master 59f9c1bd1 -> fdde7d0aa
[SPARK-16348][ML][MLLIB][PYTHON] Use full classpaths for pyspark ML JVM calls ## What changes were proposed in this pull request? Issue: Omitting the full classpath can cause problems when calling JVM methods or classes from pyspark. This PR: Changed all uses of jvm.X in pyspark.ml and pyspark.mllib to use full classpath for X ## How was this patch tested? Existing unit tests. Manual testing in an environment where this was an issue. Author: Joseph K. Bradley <jos...@databricks.com> Closes #14023 from jkbradley/SPARK-16348. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdde7d0a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdde7d0a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdde7d0a Branch: refs/heads/master Commit: fdde7d0aa0ef69d0e9a88cf712601bba1d5b0706 Parents: 59f9c1b Author: Joseph K. Bradley <jos...@databricks.com> Authored: Tue Jul 5 17:00:24 2016 -0700 Committer: Joseph K. Bradley <jos...@databricks.com> Committed: Tue Jul 5 17:00:24 2016 -0700 ---------------------------------------------------------------------- python/pyspark/ml/common.py | 10 +++++----- python/pyspark/ml/tests.py | 8 ++++---- python/pyspark/mllib/clustering.py | 5 +++-- python/pyspark/mllib/common.py | 10 +++++----- python/pyspark/mllib/feature.py | 2 +- python/pyspark/mllib/fpm.py | 2 +- python/pyspark/mllib/recommendation.py | 2 +- python/pyspark/mllib/tests.py | 15 ++++++++------- 8 files changed, 28 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fdde7d0a/python/pyspark/ml/common.py ---------------------------------------------------------------------- diff --git a/python/pyspark/ml/common.py b/python/pyspark/ml/common.py index 256e91e..7d449aa 100644 --- a/python/pyspark/ml/common.py +++ b/python/pyspark/ml/common.py @@ -63,7 +63,7 @@ def _to_java_object_rdd(rdd): RDD is serialized in batch or not. """ rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer())) - return rdd.ctx._jvm.MLSerDe.pythonToJava(rdd._jrdd, True) + return rdd.ctx._jvm.org.apache.spark.ml.python.MLSerDe.pythonToJava(rdd._jrdd, True) def _py2java(sc, obj): @@ -82,7 +82,7 @@ def _py2java(sc, obj): pass else: data = bytearray(PickleSerializer().dumps(obj)) - obj = sc._jvm.MLSerDe.loads(data) + obj = sc._jvm.org.apache.spark.ml.python.MLSerDe.loads(data) return obj @@ -95,17 +95,17 @@ def _java2py(sc, r, encoding="bytes"): clsName = 'JavaRDD' if clsName == 'JavaRDD': - jrdd = sc._jvm.MLSerDe.javaToPython(r) + jrdd = sc._jvm.org.apache.spark.ml.python.MLSerDe.javaToPython(r) return RDD(jrdd, sc) if clsName == 'Dataset': return DataFrame(r, SQLContext.getOrCreate(sc)) if clsName in _picklable_classes: - r = sc._jvm.MLSerDe.dumps(r) + r = sc._jvm.org.apache.spark.ml.python.MLSerDe.dumps(r) elif isinstance(r, (JavaArray, JavaList)): try: - r = sc._jvm.MLSerDe.dumps(r) + r = sc._jvm.org.apache.spark.ml.python.MLSerDe.dumps(r) except Py4JJavaError: pass # not pickable http://git-wip-us.apache.org/repos/asf/spark/blob/fdde7d0a/python/pyspark/ml/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 981ed9d..24efce8 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1195,12 +1195,12 @@ class VectorTests(MLlibTestCase): def _test_serialize(self, v): self.assertEqual(v, ser.loads(ser.dumps(v))) - jvec = self.sc._jvm.MLSerDe.loads(bytearray(ser.dumps(v))) - nv = ser.loads(bytes(self.sc._jvm.MLSerDe.dumps(jvec))) + jvec = self.sc._jvm.org.apache.spark.ml.python.MLSerDe.loads(bytearray(ser.dumps(v))) + nv = ser.loads(bytes(self.sc._jvm.org.apache.spark.ml.python.MLSerDe.dumps(jvec))) self.assertEqual(v, nv) vs = [v] * 100 - jvecs = self.sc._jvm.MLSerDe.loads(bytearray(ser.dumps(vs))) - nvs = ser.loads(bytes(self.sc._jvm.MLSerDe.dumps(jvecs))) + jvecs = self.sc._jvm.org.apache.spark.ml.python.MLSerDe.loads(bytearray(ser.dumps(vs))) + nvs = ser.loads(bytes(self.sc._jvm.org.apache.spark.ml.python.MLSerDe.dumps(jvecs))) self.assertEqual(vs, nvs) def test_serialize(self): http://git-wip-us.apache.org/repos/asf/spark/blob/fdde7d0a/python/pyspark/mllib/clustering.py ---------------------------------------------------------------------- diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 95f7278..93a0b64 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -507,7 +507,7 @@ class GaussianMixtureModel(JavaModelWrapper, JavaSaveable, JavaLoader): Path to where the model is stored. """ model = cls._load_java(sc, path) - wrapper = sc._jvm.GaussianMixtureModelWrapper(model) + wrapper = sc._jvm.org.apache.spark.mllib.api.python.GaussianMixtureModelWrapper(model) return cls(wrapper) @@ -638,7 +638,8 @@ class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader): Load a model from the given path. """ model = cls._load_java(sc, path) - wrapper = sc._jvm.PowerIterationClusteringModelWrapper(model) + wrapper =\ + sc._jvm.org.apache.spark.mllib.api.python.PowerIterationClusteringModelWrapper(model) return PowerIterationClusteringModel(wrapper) http://git-wip-us.apache.org/repos/asf/spark/blob/fdde7d0a/python/pyspark/mllib/common.py ---------------------------------------------------------------------- diff --git a/python/pyspark/mllib/common.py b/python/pyspark/mllib/common.py index 31afdf5..21f0e09 100644 --- a/python/pyspark/mllib/common.py +++ b/python/pyspark/mllib/common.py @@ -66,7 +66,7 @@ def _to_java_object_rdd(rdd): RDD is serialized in batch or not. """ rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer())) - return rdd.ctx._jvm.SerDe.pythonToJava(rdd._jrdd, True) + return rdd.ctx._jvm.org.apache.spark.mllib.api.python.SerDe.pythonToJava(rdd._jrdd, True) def _py2java(sc, obj): @@ -85,7 +85,7 @@ def _py2java(sc, obj): pass else: data = bytearray(PickleSerializer().dumps(obj)) - obj = sc._jvm.SerDe.loads(data) + obj = sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads(data) return obj @@ -98,17 +98,17 @@ def _java2py(sc, r, encoding="bytes"): clsName = 'JavaRDD' if clsName == 'JavaRDD': - jrdd = sc._jvm.SerDe.javaToPython(r) + jrdd = sc._jvm.org.apache.spark.mllib.api.python.SerDe.javaToPython(r) return RDD(jrdd, sc) if clsName == 'Dataset': return DataFrame(r, SQLContext.getOrCreate(sc)) if clsName in _picklable_classes: - r = sc._jvm.SerDe.dumps(r) + r = sc._jvm.org.apache.spark.mllib.api.python.SerDe.dumps(r) elif isinstance(r, (JavaArray, JavaList)): try: - r = sc._jvm.SerDe.dumps(r) + r = sc._jvm.org.apache.spark.mllib.api.python.SerDe.dumps(r) except Py4JJavaError: pass # not pickable http://git-wip-us.apache.org/repos/asf/spark/blob/fdde7d0a/python/pyspark/mllib/feature.py ---------------------------------------------------------------------- diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index e31c75c..aef91a8 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -553,7 +553,7 @@ class Word2VecModel(JavaVectorTransformer, JavaSaveable, JavaLoader): """ jmodel = sc._jvm.org.apache.spark.mllib.feature \ .Word2VecModel.load(sc._jsc.sc(), path) - model = sc._jvm.Word2VecModelWrapper(jmodel) + model = sc._jvm.org.apache.spark.mllib.api.python.Word2VecModelWrapper(jmodel) return Word2VecModel(model) http://git-wip-us.apache.org/repos/asf/spark/blob/fdde7d0a/python/pyspark/mllib/fpm.py ---------------------------------------------------------------------- diff --git a/python/pyspark/mllib/fpm.py b/python/pyspark/mllib/fpm.py index ab4066f..fb226e8 100644 --- a/python/pyspark/mllib/fpm.py +++ b/python/pyspark/mllib/fpm.py @@ -64,7 +64,7 @@ class FPGrowthModel(JavaModelWrapper, JavaSaveable, JavaLoader): Load a model from the given path. """ model = cls._load_java(sc, path) - wrapper = sc._jvm.FPGrowthModelWrapper(model) + wrapper = sc._jvm.org.apache.spark.mllib.api.python.FPGrowthModelWrapper(model) return FPGrowthModel(wrapper) http://git-wip-us.apache.org/repos/asf/spark/blob/fdde7d0a/python/pyspark/mllib/recommendation.py ---------------------------------------------------------------------- diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index 7e60255..732300e 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -207,7 +207,7 @@ class MatrixFactorizationModel(JavaModelWrapper, JavaSaveable, JavaLoader): def load(cls, sc, path): """Load a model from the given path""" model = cls._load_java(sc, path) - wrapper = sc._jvm.MatrixFactorizationModelWrapper(model) + wrapper = sc._jvm.org.apache.spark.mllib.api.python.MatrixFactorizationModelWrapper(model) return MatrixFactorizationModel(wrapper) http://git-wip-us.apache.org/repos/asf/spark/blob/fdde7d0a/python/pyspark/mllib/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 72fa8b5..99bf50b 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -150,12 +150,12 @@ class VectorTests(MLlibTestCase): def _test_serialize(self, v): self.assertEqual(v, ser.loads(ser.dumps(v))) - jvec = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(v))) - nv = ser.loads(bytes(self.sc._jvm.SerDe.dumps(jvec))) + jvec = self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads(bytearray(ser.dumps(v))) + nv = ser.loads(bytes(self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.dumps(jvec))) self.assertEqual(v, nv) vs = [v] * 100 - jvecs = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(vs))) - nvs = ser.loads(bytes(self.sc._jvm.SerDe.dumps(jvecs))) + jvecs = self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads(bytearray(ser.dumps(vs))) + nvs = ser.loads(bytes(self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.dumps(jvecs))) self.assertEqual(vs, nvs) def test_serialize(self): @@ -1650,8 +1650,8 @@ class ALSTests(MLlibTestCase): def test_als_ratings_serialize(self): r = Rating(7, 1123, 3.14) - jr = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(r))) - nr = ser.loads(bytes(self.sc._jvm.SerDe.dumps(jr))) + jr = self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads(bytearray(ser.dumps(r))) + nr = ser.loads(bytes(self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.dumps(jr))) self.assertEqual(r.user, nr.user) self.assertEqual(r.product, nr.product) self.assertAlmostEqual(r.rating, nr.rating, 2) @@ -1659,7 +1659,8 @@ class ALSTests(MLlibTestCase): def test_als_ratings_id_long_error(self): r = Rating(1205640308657491975, 50233468418, 1.0) # rating user id exceeds max int value, should fail when pickled - self.assertRaises(Py4JJavaError, self.sc._jvm.SerDe.loads, bytearray(ser.dumps(r))) + self.assertRaises(Py4JJavaError, self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads, + bytearray(ser.dumps(r))) class HashingTFTest(MLlibTestCase): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org