I have created a JIRA (https://issues.apache.org/jira/browse/SPARK-36810) to track this issue. Will look into this issue further in the coming days.
Regards Venkata krishnan On Tue, Sep 7, 2021 at 5:57 AM Steve Loughran <ste...@cloudera.com.invalid> wrote: > FileContext came in Hadoop 2.x with a cleaner split of client API and > driver implementation, and stricter definition of some things considered > broken in FileSystem (rename() corner cases, notion of a current directory, > ...) > > But as it came out after the platform was broadly adopted & never > backported to hadoop 1, it never got picked up... So even though its tagged > as the "newer" API, it's not the one used by apps. And as it will relay to > FileSystem, anyone doing interesting things at the FS client level can just > add it there and have it adopted in both places. > > The design of FileContext _is_ better, but the extra layers get in the way > of the interesting games you can play to deliver performance speedups > against cloud storage. So that's why we tend to work in FileSystem, with > the FS API spec and contract tests essentially reverse engineering what it > is that HDFS does and which applications expect (thread safety of input and > output streams, rename() return codes, ...) > > FileSystem is never going to go away. I'd like to fix rename() but we > can't change rename/2's semantics, making the protected rename/3 isn't > sufficient. See https://github.com/apache/hadoop/pull/2735 > <https://urldefense.com/v3/__https://github.com/apache/hadoop/pull/2735__;!!IKRxdwAv5BmarQ!PNuufr0_QHextDKhP3kMaqTc2HJjvEcE50VXE4ah12vXr5C-1nIBGhBiEvgYh-Q$> > for my lapsed work. Got too complicated for some spare-time work, > especially when there are others with more tangible benefit which don't > have good alternatives (https://github.com/apache/hadoop/pull/2584 > <https://urldefense.com/v3/__https://github.com/apache/hadoop/pull/2584__;!!IKRxdwAv5BmarQ!PNuufr0_QHextDKhP3kMaqTc2HJjvEcE50VXE4ah12vXr5C-1nIBGhBi1IJO5sU$> > ) > > On Mon, 6 Sept 2021 at 16:49, Adam Binford <adam...@gmail.com> wrote: > >> Sharing some things I learned looking into the Delta Lake issue: >> >> - This was a read after write inconsistency _all on the driver_. >> Specifically it currently uses the FileSystem API for reading table logs >> for greater compatibility, but the FileContext API for writes for atomic >> renames. This led to the FileSystem reads becoming stale as they didn't >> have to update their state ID after the FileContext writes from a different >> DFSClient. >> - The FileContext API generally seems less suitable for an HA HDFS setup, >> as each FileContext object creates a new DFSClient that has to re-find the >> active/observer nodes. I know these are cheap operations, but still extra >> overhead and not ideal. This is compounded by the fact that the name >> "FileContext" is misleading, as it sounds like something you should create >> a new instance of per file you want to interact with, and not try to reuse >> across a file system. There's been an open issue for 12 years about adding >> caching to the FileContext/AbstractFileSystem API, but there seems some >> hesitation there due to what happens when you update HDFS while a client is >> still active. This doesn't appear to be a huge issue directly in spark, >> since the main place FileContext is used is for the structured streaming >> commit log for atomic renames, but something to look out for in third party >> libraries. I do see a lot of warnings about the HDFSMetadataLog looking for >> the active namenode which I haven't looked into much. I'd expect to only >> see that once since it seems to properly reuse a single FileContext >> instance. >> >> Adam >> >> On Fri, Aug 20, 2021 at 2:22 PM Steve Loughran >> <ste...@cloudera.com.invalid> wrote: >> >>> >>> ooh, this is fun, >>> >>> v2 isn't safe to use unless every task attempt generates files with >>> exactly the same names and it is okay to intermingle the output of two task >>> attempts. >>> >>> This is because task commit can felt partway through (or worse, that >>> process pause for a full GC), and a second attempt committed. Spark commit >>> algorithm assumes that it's OK to commit a 2nd attempt if the first attempt >>> failed, timed out etc. It is for v1, but not v2 >>> >>> Therefore: a (nonbinding) -1 to any proposal to switch to v2. You are >>> only changing problems >>> >>> >>> I think the best fix here is to do it in the FileOutputCommitter. Be >>> aware that we are all scared of that class and always want to do the >>> minimum necessary. >>> >>> I will certainly add to the manifest committer, whose "call for >>> reviewers and testing" is still open, especially all the way through spark >>> https://github.com/apache/hadoop/pull/2971 >>> <https://urldefense.com/v3/__https://github.com/apache/hadoop/pull/2971__;!!IKRxdwAv5BmarQ!PNuufr0_QHextDKhP3kMaqTc2HJjvEcE50VXE4ah12vXr5C-1nIBGhBi-1KKzmM$> >>> >>> That committer works with HDFS too, I'd be interested in anyone >>> benchmarking it on queries with deep/wide directory trees and with >>> different tasks all generating output for the same destination directories >>> (i.e file rename dominates in job commit, not task rename). I'm not >>> optimising it for HDFS -it's trying to deal with cloud storage quirks like >>> nonatomic dir rename (GCS), slow list/file rename perf (everywhere), deep >>> directory delete timeouts, and other cloud storage specific issues. >>> >>> >>> Further reading on the commit problem in general >>> https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_release_2021-05-17 >>> <https://urldefense.com/v3/__https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_release_2021-05-17__;!!IKRxdwAv5BmarQ!PNuufr0_QHextDKhP3kMaqTc2HJjvEcE50VXE4ah12vXr5C-1nIBGhBiMBaygVI$> >>> >>> -Steve >>> >>> >>> >>> On Tue, 17 Aug 2021 at 17:39, Adam Binford <adam...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> We ran into an interesting issue that I wanted to share as well as get >>>> thoughts on if anything should be done about this. We run our own Hadoop >>>> cluster and recently deployed an Observer Namenode to take some burden off >>>> of our Active Namenode. We mostly use Delta Lake as our format, and >>>> everything seemed great. But when running some one-off analytics we ran >>>> into an issue. Specifically, we did something like: >>>> >>>> "df.<do some analytic>.repartition(1).write.csv()" >>>> >>>> This is our quick way of creating a CSV we can download and do other >>>> things with when our result is some small aggregation. However, we kept >>>> getting an empty output directory (just a _SUCCESS file and nothing else), >>>> even though in the Spark UI it says it wrote some positive number of rows. >>>> Eventually traced it back to our update to use the >>>> ObserverReadProxyProvider in our notebook sessions. I finally figured out >>>> it was due to the "Maintaining Client Consistency" section talked about in >>>> https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html >>>> <https://urldefense.com/v3/__https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html__;!!IKRxdwAv5BmarQ!PNuufr0_QHextDKhP3kMaqTc2HJjvEcE50VXE4ah12vXr5C-1nIBGhBid87xBgQ$> >>>> . >>>> >>>> After setting the auto msync period to a low value, the writes started >>>> working. I kept digging in and realized it's due to how the >>>> FileOutputCommitter v1 algorithm works. During the commitJob phase, the >>>> AM/driver does a file system listing on the output directory to find all >>>> the finished task output files it needs to move to the top level output >>>> directory. But since this is a read, the observer can serve this request, >>>> but it can be out of date and not see the newly written files that just >>>> finished from the executors. The auto msync fixed it because it forced the >>>> driver to do an msync before the read took place. However, frequent auto >>>> msyncs can defeat some of the performance benefits of the Observer. >>>> >>>> The v2 algorithm shouldn't have this issue because the tasks themselves >>>> copy the output to the final directory when they finish, and the driver >>>> simply adds the _SUCCESS at the end. And Hadoop's default is v2, but Spark >>>> overrides that to use v1 by default because of potential correctness >>>> issues, which is fair. While this is mostly an issue with Hadoop, the fact >>>> that Spark defaults to the v1 algorithm makes it somewhat of a Spark >>>> problem. Also, things like Delta Lake (or even regular structured streaming >>>> output I think) shouldn't have issues because they are direct write with >>>> transaction log based, so no file moving on the driver involved. >>>> >>>> So I mostly wanted to share that in case anyone else runs into this >>>> same issue. But also wanted to get thoughts on if anything should be done >>>> about this to prevent it from happening. Several ideas in no particular >>>> order: >>>> >>>> - Perform an msync during Spark's commitJob before calling the parent >>>> commitJob. Since this is only available in newer APIs, probably isn't even >>>> possible while maintaining compatibility with older Hadoop versions. >>>> - Attempt to get an msync added upstream in Hadoop's v1 committer's >>>> commitJob >>>> - Attempt to detect the use of the ObserverReadProxyProvider and either >>>> force using v2 committer on the spark side or just print out a warning that >>>> you either need to use the v2 committer or you need to set the auto msync >>>> period very low or 0 to guarantee correct output. >>>> - Simply add something to the Spark docs somewhere about things to know >>>> when using the ObserverReadProxyProvider >>>> - Assume that if you are capable of creating your own Hadoop cluster >>>> with an Observer Namenode you will recognize this limitation quickly, which >>>> it only took me about an hour to figure out so that's also fair >>>> >>>> Thanks, >>>> >>>> -- >>>> Adam >>>> >>> >> >> -- >> Adam Binford >> >