On 2 Aug 2017, at 20:05, Gourav Sengupta 
<gourav.sengu...@gmail.com<mailto:gourav.sengu...@gmail.com>> wrote:

Hi Steve,

I have written a sincere note of apology to everyone in a separate email. I 
sincerely request your kind forgiveness before hand if anything does sound 
impolite in my emails, in advance.

Let me first start by thanking you.

I know it looks like I formed all my opinion based on that document, but that 
is not the case at all. If you or anyone tries to execute the code that I have 
given then they will see what I mean. Code speaks louder and better than words 
for me.

So I am not saying you are wrong. I am asking verify and expecting someone will 
be able to correct  a set of understanding that a moron like me has gained 
after long hours of not having anything better to do.


SCENARIO: there are two files file1.csv and file2.csv stored in HDFS with 
replication 2 and there is a HADOOP cluster of three nodes. All these nodes 
have SPARK workers (executors) running in them.  Both are stored in the 
following way:
-----------------------------------------------------
| SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
| (worker1)   |  (worker2)    |  (worker3)   |
| (master)     |                     |                    |
-----------------------------------------------------
| file1.csv      |                     | file1.csv     |
-----------------------------------------------------
|                    |  file2.csv      | file2.csv     |
-----------------------------------------------------
| file3.csv      |  file3.csv      |                   |
-----------------------------------------------------





CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
HDFS replication does not store the same file in all the nodes in the cluster. 
So if I have three nodes and the replication is two then the same file will be 
stored physically in two nodes in the cluster. Does that sound right?


HDFS breaks files up into blocks (default = 128MB). If a .csv file is > 128 
then it will be broken up into blocks

file1.cvs -> [block0001, block002, block0003]

and each block will be replicated. With replication = 2 there will be two 
copies of each block, but the file itself can span > 2 hosts.


ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
If SPARK is trying to process to the records then I am expecting that WORKER2 
should not be processing file1.csv, and similary WORKER 1 should not be 
processing file2.csv and WORKER3 should not be processing file3.csv. Because in 
case WORKER2 was trying to process file1.csv then it will actually causing 
network transmission of the file unnecessarily.


Spark prefers to schedule work locally, so as to save on network traffic, but 
it schedules for execution time over waiting for workers free on the node with 
the data. IF a block is on nodes 2 and 3 but there is only a free thread on 
node 1, then node 1 gets the work

There's details on whether/how work across blocks takes place which I'm 
avoiding. For now know those formats which are "splittable" will have work 
scheduled by block. If you use Parquet/ORC/avro for your data and compress with 
snappy, it will be split. This gives you maximum performance as >1 thread can 
work on different blocks. That is, if file1 is split into three blocks, three 
worker threads can process it.


ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE CLARIFY THIS):
if WORKER 2 is not processing file1.csv then how does it matter whether the 
file is there or not at all in the system? Should not SPARK just ask the 
workers to process the files which are avialable in the worker nodes? In case 
both WORKER2 and WORKER3 fails and are not available then file2.csv will not be 
processed at all.


locality is best-effort, not guaranteed.


ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD BE EXECUTED 
(Its been pointed out that I am learning SPARK, and even I did not take more 
than 13 mins to set up the cluster and run the code).

Once you execute the code then you will find that:
1.  if the path starts with file:/// while reading back then there is no error 
reported, but the number of records reported back are only those records in the 
worker which also has the server.
2. also you will notice that once you cache the file before writing the 
partitions are ditributed nicely across the workers, and while writing back, 
the dataframe partitions does write properly to the worker node in the Master, 
but the workers in the other system have the files written in _temporary folder 
which does not get copied back to the main folder. Inspite of this the job is 
not reported as failed in SPARK.

This gets into the "commit protocol". You don't want to know all the dirty 
details (*) but essentially its this

1. Every worker writes its output to a directory under the destination 
directory, something like 
'$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID'
2. it is the spark driver which "commits" the job by moving the output from the 
individual workers from the temporary directories into $dest, then deleting 
$dest/_temporary
3. For which it needs to be able to list all the output in $dest/_temporary

In your case, only the output on the same node of the driver is being 
committed, because only those files can be listed and moved. The output on the 
other nodes isn't seen, so isn't committed, nor cleaned up.



Now in my own world, if I see, the following things are happening, something is 
going wrong (with me):
1. SPARK transfers files from different systems to process, instead of 
processing them locally (I do not have code to prove this, and therefore its 
just an assumption)
2. SPARK cannot determine when the writes are failing in standalone clusters 
workers and reports success (code is there for this)
3. SPARK reports back number of records in the worker running in the master 
node when count() is given without reporting an error while using file:/// and 
reports an error when I mention the path without file:/// (for SPARK 2.1.x 
onwards, code is there for this)


s everyone's been saying, file:// requires a shared filestore, with uniform 
paths everywhere. That's needed to list the files to process, read the files in 
the workers and commit the final output. NFS cross-mounting is the simplest way 
to do this, especially as for three nodes HDFS is overkill: more services to 
keep running, no real fault tolerance. Export a directory tree from one of the 
servers, give the rest access to it, don't worry about bandwidth use as the 
shared disk itself will become the bottleneck



I very sincerely hope with your genuine help the bar of language and social 
skills will be lowered for me. And everyone will find a way to excuse me and 
not qualify this email as a means to measure my extremely versatile and 
amazingly vivid social skills. It will be a lot of help to just focus on the 
facts related to machines, data, error and (the language that I somehow 
understand better) code.


My sincere apologies once again, as I am 100% sure that I did not meet the 
required social and language skills.

Thanks a ton once again for your kindness, patience and understanding.


Regards,
Gourav Sengupta


* for the curious, the details of the v1 and v2 commit protocols are
https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md

Like I said: you don't want to know the details, and you really don't want to 
step through Hadoop's FileOutputCommitter to see what's going on. The Spark 
side is much easier to follow.

Reply via email to