Re: [PySpark]: How to store NumPy array into single DataFrame cell efficiently

2017-06-28 Thread Judit Planas

  
  
Dear Nick,

Thanks for your quick reply.

I quickly implemented your proposal, but I do not see any
improvement. In fact, the test data set of around 3 GB occupies a
total of 10 GB in worker memory, and the execution time of queries
is like 4 times slower than the DF implementation with BinaryType NP
arrays.

Maybe I am doing something wrong, or there is something I am not
taking into account?

Thanks!

Judit

On 28/06/17 12:41, Nick Pentreath
  wrote:


  
  You will need to use PySpark vectors to store in a
DataFrame. They can be created from Numpy arrays as follows:



  from pyspark.ml.linalg import
  Vectors
  df =
  spark.createDataFrame([("src1", "pkey1", 1,
  Vectors.dense(np.array([0, 1, 2])))])
  
  
  
  
On Wed, 28 Jun 2017 at 12:23 Judit Planas
  
  wrote:


   Dear all,

I am trying to store a NumPy array (loaded from an HDF5
dataset) into one cell of a DataFrame, but I am having
problems.

In short, my data layout is similar to a database, where
I have a few columns with metadata (source of
information, primary key, etc.) and the last column
contains a NumPy array (each array will have hundreds to
thousands of elements):
++---+-+---+
| src| PrimaryKey| SecondaryKey|  
Data|
++---+-+---+
|  "src1"|    "pkey1"|    1| np.array([0., 1.,
2.])|
|  "src2"|    "pkey1"|    2| np.array([0., 1.,
2.])|
++---+-+---+

In my case, it is important to keep the NumPy array as
it is (no transformation into Python list, etc.) because
later on I will compute some statistics on the array,
like the average of values. In addition, I expect to
have thousands of rows (NumPy arrays), so I think trying
to explode each array will generate too much duplicated
metadata.

I have successfully been able to load the data that I
want into an RDD using the NumPy array object as it is.
But I would like to use the DataFrame structure to
leverage from the SQL functions.

What I have been able to achieve so far is to store the
raw data of the NP array doing the following:
1. Get the raw data of the NP array by calling
"tobytes()" [https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.tobytes.html]
2. Use "BinaryType" in the DF schema for the NP array
3. Call "np.frombuffer()" whenever I want to get the NP
array back [https://docs.scipy.org/doc/numpy/reference/generated/numpy.frombuffer.html]

However, I feel this process is not optimal and it
consumes a lot of worker memory. For example, if my data
size is around 3 GB:
- Loading all data into a DF and calling "cache()"
method (within the same line) produces around 3 GB of
memory consumed on the worker nodes.
- However, loading all data into an RDD and calling
"cache()" method (within the same line) produces around
500 MB of consumed on the worker nodes.

From this, I understand that my data is highly
compressible, so using an RDD is more memory-efficient
than the DF ('spark.rdd.compress' is set to 'True' by
default).

In addition, what I see when I run queries on the data
is that, in general, the RDD computes the query in less
time than the DF. My hypothesis here is the following:
since data must be exchanged between worker nodes in
order to perform the queries, the RDD takes less time
because data is compressed, so communication between
workers takes less time.

To summarize, I would like to use the DF structure due
to its advantages (optimized scheduler, SQL support,
etc.), but what I see from real performance measurements
is that RDDs are much more efficient in my 

Re: [PySpark]: How to store NumPy array into single DataFrame cell efficiently

2017-06-28 Thread Nick Pentreath
You will need to use PySpark vectors to store in a DataFrame. They can be
created from Numpy arrays as follows:

from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([("src1", "pkey1", 1, Vectors.dense(np.array([0,
1, 2])))])


On Wed, 28 Jun 2017 at 12:23 Judit Planas  wrote:

> Dear all,
>
> I am trying to store a NumPy array (loaded from an HDF5 dataset) into one
> cell of a DataFrame, but I am having problems.
>
> In short, my data layout is similar to a database, where I have a few
> columns with metadata (source of information, primary key, etc.) and the
> last column contains a NumPy array (each array will have hundreds to
> thousands of elements):
> ++---+-+---+
> | src| PrimaryKey| SecondaryKey|   Data|
> ++---+-+---+
> |  "src1"|"pkey1"|1| np.array([0., 1., 2.])|
> |  "src2"|"pkey1"|2| np.array([0., 1., 2.])|
> ++---+-+---+
>
> In my case, it is important to keep the NumPy array as it is (no
> transformation into Python list, etc.) because later on I will compute some
> statistics on the array, like the average of values. In addition, I expect
> to have thousands of rows (NumPy arrays), so I think trying to explode each
> array will generate too much duplicated metadata.
>
> I have successfully been able to load the data that I want into an RDD
> using the NumPy array object as it is. But I would like to use the
> DataFrame structure to leverage from the SQL functions.
>
> What I have been able to achieve so far is to store the raw data of the NP
> array doing the following:
> 1. Get the raw data of the NP array by calling "tobytes()" [
> https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.tobytes.html
> ]
> 2. Use "BinaryType" in the DF schema for the NP array
> 3. Call "np.frombuffer()" whenever I want to get the NP array back [
> https://docs.scipy.org/doc/numpy/reference/generated/numpy.frombuffer.html
> ]
>
> However, I feel this process is not optimal and it consumes a lot of
> worker memory. For example, if my data size is around 3 GB:
> - Loading all data into a DF and calling "cache()" method (within the same
> line) produces around 3 GB of memory consumed on the worker nodes.
> - However, loading all data into an RDD and calling "cache()" method
> (within the same line) produces around 500 MB of consumed on the worker
> nodes.
>
> From this, I understand that my data is highly compressible, so using an
> RDD is more memory-efficient than the DF ('spark.rdd.compress' is set to
> 'True' by default).
>
> In addition, what I see when I run queries on the data is that, in
> general, the RDD computes the query in less time than the DF. My hypothesis
> here is the following: since data must be exchanged between worker nodes in
> order to perform the queries, the RDD takes less time because data is
> compressed, so communication between workers takes less time.
>
> To summarize, I would like to use the DF structure due to its advantages
> (optimized scheduler, SQL support, etc.), but what I see from real
> performance measurements is that RDDs are much more efficient in my case
> (both execution time and memory consumption). So, I wonder if there is a
> better way to store NP arrays into a DF so that I can benefit from their
> advantages but at the same time they show the same good performance as RDDs.
>
> Regarding the environment, my Spark version is 2.0.1 with Python 3.5.2,
> but I am not restricted to use these versions. I am not tuning any special
> variable (using default values).
>
> Thanks in advance, and please, let me know if I forgot to mention any
> detail or you need further information.
>
> Kind regards,
> Judit
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org