Dear Nick,

Thanks for your quick reply.

I quickly implemented your proposal, but I do not see any improvement. In fact, the test data set of around 3 GB occupies a total of 10 GB in worker memory, and the execution time of queries is like 4 times slower than the DF implementation with BinaryType NP arrays.

Maybe I am doing something wrong, or there is something I am not taking into account?

Thanks!

Judit

On 28/06/17 12:41, Nick Pentreath wrote:
You will need to use PySpark vectors to store in a DataFrame. They can be created from Numpy arrays as follows:

from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([("src1", "pkey1", 1, Vectors.dense(np.array([0, 1, 2])))])


On Wed, 28 Jun 2017 at 12:23 Judit Planas <judit.pla...@epfl.ch> wrote:
Dear all,

I am trying to store a NumPy array (loaded from an HDF5 dataset) into one cell of a DataFrame, but I am having problems.

In short, my data layout is similar to a database, where I have a few columns with metadata (source of information, primary key, etc.) and the last column contains a NumPy array (each array will have hundreds to thousands of elements):
+--------+-----------+-------------+-----------------------+
|     src| PrimaryKey| SecondaryKey|                   Data|
+--------+-----------+-------------+-----------------------+
|  "src1"|    "pkey1"|            1| np.array([0., 1., 2.])|
|  "src2"|    "pkey1"|            2| np.array([0., 1., 2.])|
+--------+-----------+-------------+-----------------------+

In my case, it is important to keep the NumPy array as it is (no transformation into Python list, etc.) because later on I will compute some statistics on the array, like the average of values. In addition, I expect to have thousands of rows (NumPy arrays), so I think trying to explode each array will generate too much duplicated metadata.

I have successfully been able to load the data that I want into an RDD using the NumPy array object as it is. But I would like to use the DataFrame structure to leverage from the SQL functions.

What I have been able to achieve so far is to store the raw data of the NP array doing the following:
1. Get the raw data of the NP array by calling "tobytes()" [https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.tobytes.html]
2. Use "BinaryType" in the DF schema for the NP array
3. Call "np.frombuffer()" whenever I want to get the NP array back [https://docs.scipy.org/doc/numpy/reference/generated/numpy.frombuffer.html]

However, I feel this process is not optimal and it consumes a lot of worker memory. For example, if my data size is around 3 GB:
- Loading all data into a DF and calling "cache()" method (within the same line) produces around 3 GB of memory consumed on the worker nodes.
- However, loading all data into an RDD and calling "cache()" method (within the same line) produces around 500 MB of consumed on the worker nodes.

From this, I understand that my data is highly compressible, so using an RDD is more memory-efficient than the DF ('spark.rdd.compress' is set to 'True' by default).

In addition, what I see when I run queries on the data is that, in general, the RDD computes the query in less time than the DF. My hypothesis here is the following: since data must be exchanged between worker nodes in order to perform the queries, the RDD takes less time because data is compressed, so communication between workers takes less time.

To summarize, I would like to use the DF structure due to its advantages (optimized scheduler, SQL support, etc.), but what I see from real performance measurements is that RDDs are much more efficient in my case (both execution time and memory consumption). So, I wonder if there is a better way to store NP arrays into a DF so that I can benefit from their advantages but at the same time they show the same good performance as RDDs.

Regarding the environment, my Spark version is 2.0.1 with Python 3.5.2, but I am not restricted to use these versions. I am not tuning any special variable (using default values).

Thanks in advance, and please, let me know if I forgot to mention any detail or you need further information.

Kind regards,
Judit
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org

--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to