[
https://issues.apache.org/jira/browse/SPARK-23810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
dciborow updated SPARK-23810:
-----------------------------
Priority: Minor (was: Major)
> Matrix Multiplication is so bad, file I/O to local python is better
> -------------------------------------------------------------------
>
> Key: SPARK-23810
> URL: https://issues.apache.org/jira/browse/SPARK-23810
> Project: Spark
> Issue Type: Bug
> Components: MLlib
> Affects Versions: 2.2.0
> Reporter: dciborow
> Priority: Minor
>
> I am trying to multiple two matrices. One is 130k by 30k. The second is 30k
> by 30k.
> Running this leads to hearbeat timeout, Java Heap Space and Garage collection
> errors.
> {{rdd.toBlockMatrix.multiply(rightRdd.toBlockMatrix).toIndexedRowMatrix()}}
> {{I have also tried the following which will fail on the toLocalMatrix call.
> }}
> val userMatrix = new CoordinateMatrix(userRDD).toIndexedRowMatrix()
> val itemMatrix = new
> CoordinateMatrix(itemRDD).toBlockMatrix().toLocalMatrix()
> val itemMatrixBC = session.sparkContext.broadcast(itemMatrix)
> val userToItemMatrix = userMatrix
> .multiply(itemMatrixBC.value)
> .rows.map(index => (index.index.toInt, index.vector))
>
> I instead have gotten this operation "working", by saving the inputs
> dataframes to parquet(which start as DataFrames before the .rdd call to get
> them to work with the matrix types), and then loading them into
> python/pandas, using numpy for the matrix mulplication, saving back to
> parquet, and rereading back into spark.
>
> Python -
> import pandas as pd
> import numpy as np
> X = pd.read_parquet('./items-parquet', engine='pyarrow')
> #Xp = np.stack(X.jaccardList)
> Xp = pd.DataFrame(np.stack(X.jaccardList), X.itemID)
> Xrows = pd.DataFrame(index=range(0, X.itemID.max()+1))
> Xpp = Xrows.join(Xp).fillna(0)
> Y = pd.read_parquet('./users-parquet',engine='pyarrow')
> Yp = np.stack(Y.flatList)
> Z = np.matmul(Yp, Xpp)
> Zp = pd.DataFrame(Z)
> Zp.columns = list(map(str, Zp.columns))
> Zpp = pd.DataFrame()
> Zpp['id'] = Zp.index
> Zpp['ratings'] = Zp.values.tolist()
> Zpp.to_parquet("sampleout.parquet",engine='pyarrow')
>
> Scala -
> import sys.process._
> val result = "python matmul.py".!
> val pythonOutput =
> userDataFrame.sparkSession.read.parquet("./sampleout.parquet")
>
> I can provide code, and the data to repo. But could use some instructions how
> to set that up. This is based on the MovieLens 20mil dataset, or I can
> provide access to my data in Azure.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]