You will need to use PySpark vectors to store in a DataFrame. They can be created from Numpy arrays as follows:
from pyspark.ml.linalg import Vectors df = spark.createDataFrame([("src1", "pkey1", 1, Vectors.dense(np.array([0, 1, 2])))]) On Wed, 28 Jun 2017 at 12:23 Judit Planas <judit.pla...@epfl.ch> wrote: > Dear all, > > I am trying to store a NumPy array (loaded from an HDF5 dataset) into one > cell of a DataFrame, but I am having problems. > > In short, my data layout is similar to a database, where I have a few > columns with metadata (source of information, primary key, etc.) and the > last column contains a NumPy array (each array will have hundreds to > thousands of elements): > +--------+-----------+-------------+-----------------------+ > | src| PrimaryKey| SecondaryKey| Data| > +--------+-----------+-------------+-----------------------+ > | "src1"| "pkey1"| 1| np.array([0., 1., 2.])| > | "src2"| "pkey1"| 2| np.array([0., 1., 2.])| > +--------+-----------+-------------+-----------------------+ > > In my case, it is important to keep the NumPy array as it is (no > transformation into Python list, etc.) because later on I will compute some > statistics on the array, like the average of values. In addition, I expect > to have thousands of rows (NumPy arrays), so I think trying to explode each > array will generate too much duplicated metadata. > > I have successfully been able to load the data that I want into an RDD > using the NumPy array object as it is. But I would like to use the > DataFrame structure to leverage from the SQL functions. > > What I have been able to achieve so far is to store the raw data of the NP > array doing the following: > 1. Get the raw data of the NP array by calling "tobytes()" [ > https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.tobytes.html > ] > 2. Use "BinaryType" in the DF schema for the NP array > 3. Call "np.frombuffer()" whenever I want to get the NP array back [ > https://docs.scipy.org/doc/numpy/reference/generated/numpy.frombuffer.html > ] > > However, I feel this process is not optimal and it consumes a lot of > worker memory. For example, if my data size is around 3 GB: > - Loading all data into a DF and calling "cache()" method (within the same > line) produces around 3 GB of memory consumed on the worker nodes. > - However, loading all data into an RDD and calling "cache()" method > (within the same line) produces around 500 MB of consumed on the worker > nodes. > > From this, I understand that my data is highly compressible, so using an > RDD is more memory-efficient than the DF ('spark.rdd.compress' is set to > 'True' by default). > > In addition, what I see when I run queries on the data is that, in > general, the RDD computes the query in less time than the DF. My hypothesis > here is the following: since data must be exchanged between worker nodes in > order to perform the queries, the RDD takes less time because data is > compressed, so communication between workers takes less time. > > To summarize, I would like to use the DF structure due to its advantages > (optimized scheduler, SQL support, etc.), but what I see from real > performance measurements is that RDDs are much more efficient in my case > (both execution time and memory consumption). So, I wonder if there is a > better way to store NP arrays into a DF so that I can benefit from their > advantages but at the same time they show the same good performance as RDDs. > > Regarding the environment, my Spark version is 2.0.1 with Python 3.5.2, > but I am not restricted to use these versions. I am not tuning any special > variable (using default values). > > Thanks in advance, and please, let me know if I forgot to mention any > detail or you need further information. > > Kind regards, > Judit > --------------------------------------------------------------------- To > unsubscribe e-mail: user-unsubscr...@spark.apache.org