A join is the natural answer, but this is a 10114-way join, which probably
chokes readily just to even plan it, let alone all the shuffling and
shuffling of huge data. You could tune your way out of it maybe, but not
optimistic. It's just huge.

You could go off-road and lower-level to take advantage of the structure of
the data. You effectively want "column bind". There is no such operation in
Spark. (union is 'row bind'.) You could do this with zipPartition, which is
in the RDD API, and to my surprise, not in the Python API but exists in
Scala. And R (!). If you can read several RDDs of data, you can use this
method to pair all their corresponding values and ultimately get rows of
10114 values out. In fact that is how sparklyr implements cbind on Spark,
FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html

The issue I see is that you can only zip a few at a time; you don't want to
zip 10114 of them. Perhaps you have to do that iteratively, and I don't
know if that is going to face the same issues with huge huge plans.

I like the pivot idea. If you can read the individual files as data rows
(maybe list all the file names, parallelize with Spark, write a UDF that
reads the data for that file to generate the rows). If you can emit (file,
index, value) and groupBy index, pivot on file (I think?) that should be
about it? I think it doesn't need additional hashing or whatever. Not sure
how fast it is but that seems more direct than the join, as well.

On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson <aedav...@ucsc.edu.invalid>
wrote:

> Hi have a hard problem
>
>
>
> I have  10114 column vectors each in a separate file. The file has 2
> columns, the row id, and numeric values. The row ids are identical and in
> sort order. All the column vectors have the same number of rows. There are
> over 5 million rows.  I need to combine them into a single table. The row
> ids are very long strings. The column names are about 20 chars long.
>
>
>
> My current implementation uses join. This takes a long time on a cluster
> with 2 works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I
> mean totally dead start over. Checkpoints do not seem  help, It still
> crashes and need to be restarted from scratch. What is really surprising
> is the final file size is only 213G ! The way got the file  was to copy
> all the column vectors to a single BIG IRON machine and used unix cut and
> paste. Took about 44 min to run once I got all the data moved around. It
> was very tedious and error prone. I had to move a lot data around. Not a
> particularly reproducible process. I will need to rerun this three more
> times on different data sets of about the same size
>
>
>
> I noticed that spark has a union function(). It implements row bind. Any
> idea how it is implemented? Is it just map reduce under the covers?
>
>
>
> My thought was
>
>    1. load each col vector
>    2. maybe I need to replace the really long row id strings with integers
>    3. convert column vectors into row vectors using piviot (Ie matrix
>    transpose.)
>    4. union all the row vectors into a single table
>    5. piviot the table back so I have the correct column vectors
>
>
>
> I could replace the row ids and column name with integers if needed, and
> restore them later
>
>
>
> Maybe I would be better off using many small machines? I assume memory is
> the limiting resource not cpu. I notice that memory usage will reach 100%.
> I added several TB’s of local ssd. I am not convinced that spark is using
> the local disk
>
>
>
>
>
> will this perform better than join?
>
>
>
>    - The rows  before the final pivot will be very very wide (over 5
>    million columns)
>    - There will only be 10114 rows before the pivot
>
>
>
> I assume the pivots will shuffle all the data. I assume the Colum vectors
> are trivial. The file table pivot will be expensive however will only need
> to be done once
>
>
>
>
>
>
>
> Comments and suggestions appreciated
>
>
>
> Andy
>
>
>
>
>

Reply via email to