Hi,

We ran into an interesting issue that I wanted to share as well as get
thoughts on if anything should be done about this. We run our own Hadoop
cluster and recently deployed an Observer Namenode to take some burden off
of our Active Namenode. We mostly use Delta Lake as our format, and
everything seemed great. But when running some one-off analytics we ran
into an issue. Specifically, we did something like:

"df.<do some analytic>.repartition(1).write.csv()"

This is our quick way of creating a CSV we can download and do other things
with when our result is some small aggregation. However, we kept getting an
empty output directory (just a _SUCCESS file and nothing else), even though
in the Spark UI it says it wrote some positive number of rows. Eventually
traced it back to our update to use the ObserverReadProxyProvider in our
notebook sessions. I finally figured out it was due to the "Maintaining
Client Consistency" section talked about in
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html
.

After setting the auto msync period to a low value, the writes started
working. I kept digging in and realized it's due to how the
FileOutputCommitter v1 algorithm works. During the commitJob phase, the
AM/driver does a file system listing on the output directory to find all
the finished task output files it needs to move to the top level output
directory. But since this is a read, the observer can serve this request,
but it can be out of date and not see the newly written files that just
finished from the executors. The auto msync fixed it because it forced the
driver to do an msync before the read took place. However, frequent auto
msyncs can defeat some of the performance benefits of the Observer.

The v2 algorithm shouldn't have this issue because the tasks themselves
copy the output to the final directory when they finish, and the driver
simply adds the _SUCCESS at the end. And Hadoop's default is v2, but Spark
overrides that to use v1 by default because of potential correctness
issues, which is fair. While this is mostly an issue with Hadoop, the fact
that Spark defaults to the v1 algorithm makes it somewhat of a Spark
problem. Also, things like Delta Lake (or even regular structured streaming
output I think) shouldn't have issues because they are direct write with
transaction log based, so no file moving on the driver involved.

So I mostly wanted to share that in case anyone else runs into this same
issue. But also wanted to get thoughts on if anything should be done about
this to prevent it from happening. Several ideas in no particular order:

- Perform an msync during Spark's commitJob before calling the parent
commitJob. Since this is only available in newer APIs, probably isn't even
possible while maintaining compatibility with older Hadoop versions.
- Attempt to get an msync added upstream in Hadoop's v1 committer's
commitJob
- Attempt to detect the use of the ObserverReadProxyProvider and either
force using v2 committer on the spark side or just print out a warning that
you either need to use the v2 committer or you need to set the auto msync
period very low or 0 to guarantee correct output.
- Simply add something to the Spark docs somewhere about things to know
when using the ObserverReadProxyProvider
- Assume that if you are capable of creating your own Hadoop cluster with
an Observer Namenode you will recognize this limitation quickly, which it
only took me about an hour to figure out so that's also fair

Thanks,

-- 
Adam

Reply via email to