Dear all,
I am trying to store a NumPy array (loaded from an HDF5
dataset) into one cell of a DataFrame, but I am having
problems.
In short, my data layout is similar to a database, where
I have a few columns with metadata (source of
information, primary key, etc.) and the last column
contains a NumPy array (each array will have hundreds to
thousands of elements):
+--------+-----------+-------------+-----------------------+
| src| PrimaryKey| SecondaryKey|
Data|
+--------+-----------+-------------+-----------------------+
| "src1"| "pkey1"| 1| np.array([0., 1.,
2.])|
| "src2"| "pkey1"| 2| np.array([0., 1.,
2.])|
+--------+-----------+-------------+-----------------------+
In my case, it is important to keep the NumPy array as
it is (no transformation into Python list, etc.) because
later on I will compute some statistics on the array,
like the average of values. In addition, I expect to
have thousands of rows (NumPy arrays), so I think trying
to explode each array will generate too much duplicated
metadata.
I have successfully been able to load the data that I
want into an RDD using the NumPy array object as it is.
But I would like to use the DataFrame structure to
leverage from the SQL functions.
What I have been able to achieve so far is to store the
raw data of the NP array doing the following:
1. Get the raw data of the NP array by calling
"tobytes()" [
https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.tobytes.html]
2. Use "BinaryType" in the DF schema for the NP array
3. Call "np.frombuffer()" whenever I want to get the NP
array back [
https://docs.scipy.org/doc/numpy/reference/generated/numpy.frombuffer.html]
However, I feel this process is not optimal and it
consumes a lot of worker memory. For example, if my data
size is around 3 GB:
- Loading all data into a DF and calling "cache()"
method (within the same line) produces around 3 GB of
memory consumed on the worker nodes.
- However, loading all data into an RDD and calling
"cache()" method (within the same line) produces around
500 MB of consumed on the worker nodes.
From this, I understand that my data is highly
compressible, so using an RDD is more memory-efficient
than the DF ('spark.rdd.compress' is set to 'True' by
default).
In addition, what I see when I run queries on the data
is that, in general, the RDD computes the query in less
time than the DF. My hypothesis here is the following:
since data must be exchanged between worker nodes in
order to perform the queries, the RDD takes less time
because data is compressed, so communication between
workers takes less time.
To summarize, I would like to use the DF structure due
to its advantages (optimized scheduler, SQL support,
etc.), but what I see from real performance measurements
is that RDDs are much more efficient in my case (both
execution time and memory consumption). So, I wonder if
there is a better way to store NP arrays into a DF so
that I can benefit from their advantages but at the same
time they show the same good performance as RDDs.
Regarding the environment, my Spark version is 2.0.1
with Python 3.5.2, but I am not restricted to use these
versions. I am not tuning any special variable (using
default values).
Thanks in advance, and please, let me know if I forgot
to mention any detail or you need further information.
Kind regards,
Judit
---------------------------------------------------------------------
To unsubscribe e-mail: