I have created a JIRA (https://issues.apache.org/jira/browse/SPARK-36810)
to track this issue. Will look into this issue further in the coming days.

Regards
Venkata krishnan


On Tue, Sep 7, 2021 at 5:57 AM Steve Loughran <ste...@cloudera.com.invalid>
wrote:

> FileContext came in Hadoop 2.x with a cleaner split of client API and
> driver implementation, and stricter definition of some things considered
> broken in FileSystem (rename() corner cases, notion of a current directory,
> ...)
>
> But as it came out after the platform was broadly adopted & never
> backported to hadoop 1, it never got picked up... So even though its tagged
> as the "newer" API, it's not the one used by apps. And as it will relay to
> FileSystem, anyone doing interesting things at the FS client level can just
> add it there and have it adopted in both places.
>
> The design of FileContext _is_ better, but the extra layers get in the way
> of the interesting games you can play to deliver performance speedups
> against cloud storage. So that's why we tend to work in FileSystem, with
> the FS API spec and contract tests essentially reverse engineering what it
> is that HDFS does and which applications expect (thread safety of input and
> output streams, rename() return codes, ...)
>
> FileSystem is never going to go away. I'd like to fix rename() but we
> can't change rename/2's semantics, making the protected rename/3 isn't
> sufficient. See https://github.com/apache/hadoop/pull/2735
> <https://urldefense.com/v3/__https://github.com/apache/hadoop/pull/2735__;!!IKRxdwAv5BmarQ!PNuufr0_QHextDKhP3kMaqTc2HJjvEcE50VXE4ah12vXr5C-1nIBGhBiEvgYh-Q$>
> for my lapsed work. Got too complicated for some spare-time work,
> especially when there are others with more tangible benefit which don't
> have good alternatives (https://github.com/apache/hadoop/pull/2584
> <https://urldefense.com/v3/__https://github.com/apache/hadoop/pull/2584__;!!IKRxdwAv5BmarQ!PNuufr0_QHextDKhP3kMaqTc2HJjvEcE50VXE4ah12vXr5C-1nIBGhBi1IJO5sU$>
> )
>
> On Mon, 6 Sept 2021 at 16:49, Adam Binford <adam...@gmail.com> wrote:
>
>> Sharing some things I learned looking into the Delta Lake issue:
>>
>> - This was a read after write inconsistency _all on the driver_.
>> Specifically it currently uses the FileSystem API for reading table logs
>> for greater compatibility, but the FileContext API for writes for atomic
>> renames. This led to the FileSystem reads becoming stale as they didn't
>> have to update their state ID after the FileContext writes from a different
>> DFSClient.
>> - The FileContext API generally seems less suitable for an HA HDFS setup,
>> as each FileContext object creates a new DFSClient that has to re-find the
>> active/observer nodes. I know these are cheap operations, but still extra
>> overhead and not ideal. This is compounded by the fact that the name
>> "FileContext" is misleading, as it sounds like something you should create
>> a new instance of per file you want to interact with, and not try to reuse
>> across a file system. There's been an open issue for 12 years about adding
>> caching to the FileContext/AbstractFileSystem API, but there seems some
>> hesitation there due to what happens when you update HDFS while a client is
>> still active. This doesn't appear to be a huge issue directly in spark,
>> since the main place FileContext is used is for the structured streaming
>> commit log for atomic renames, but something to look out for in third party
>> libraries. I do see a lot of warnings about the HDFSMetadataLog looking for
>> the active namenode which I haven't looked into much. I'd expect to only
>> see that once since it seems to properly reuse a single FileContext
>> instance.
>>
>> Adam
>>
>> On Fri, Aug 20, 2021 at 2:22 PM Steve Loughran
>> <ste...@cloudera.com.invalid> wrote:
>>
>>>
>>> ooh, this is fun,
>>>
>>> v2 isn't safe to use unless every task attempt generates files with
>>> exactly the same names and it is okay to intermingle the output of two task
>>> attempts.
>>>
>>> This is because task commit can felt partway through (or worse, that
>>> process pause for a full GC), and a second attempt committed. Spark commit
>>> algorithm assumes that it's OK to commit a 2nd attempt if the first attempt
>>> failed, timed out etc. It is for v1, but not v2
>>>
>>> Therefore: a (nonbinding) -1 to any proposal to switch to v2. You are
>>> only changing problems
>>>
>>>
>>> I think the best fix here is to do it in the FileOutputCommitter. Be
>>> aware that we are all scared of that class and always want to do the
>>> minimum necessary.
>>>
>>> I will certainly add to the manifest committer, whose "call for
>>> reviewers and testing" is still open, especially all the way through spark
>>> https://github.com/apache/hadoop/pull/2971
>>> <https://urldefense.com/v3/__https://github.com/apache/hadoop/pull/2971__;!!IKRxdwAv5BmarQ!PNuufr0_QHextDKhP3kMaqTc2HJjvEcE50VXE4ah12vXr5C-1nIBGhBi-1KKzmM$>
>>>
>>> That committer works with HDFS too, I'd be interested in anyone
>>> benchmarking it on queries with deep/wide directory trees and with
>>> different tasks all generating output for the same destination directories
>>> (i.e file rename dominates in job commit, not task rename). I'm not
>>> optimising it for HDFS -it's trying to deal with cloud storage quirks like
>>> nonatomic dir rename (GCS), slow list/file rename perf (everywhere), deep
>>> directory delete timeouts, and other cloud storage specific issues.
>>>
>>>
>>> Further reading on the commit problem in general
>>> https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_release_2021-05-17
>>> <https://urldefense.com/v3/__https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_release_2021-05-17__;!!IKRxdwAv5BmarQ!PNuufr0_QHextDKhP3kMaqTc2HJjvEcE50VXE4ah12vXr5C-1nIBGhBiMBaygVI$>
>>>
>>> -Steve
>>>
>>>
>>>
>>> On Tue, 17 Aug 2021 at 17:39, Adam Binford <adam...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> We ran into an interesting issue that I wanted to share as well as get
>>>> thoughts on if anything should be done about this. We run our own Hadoop
>>>> cluster and recently deployed an Observer Namenode to take some burden off
>>>> of our Active Namenode. We mostly use Delta Lake as our format, and
>>>> everything seemed great. But when running some one-off analytics we ran
>>>> into an issue. Specifically, we did something like:
>>>>
>>>> "df.<do some analytic>.repartition(1).write.csv()"
>>>>
>>>> This is our quick way of creating a CSV we can download and do other
>>>> things with when our result is some small aggregation. However, we kept
>>>> getting an empty output directory (just a _SUCCESS file and nothing else),
>>>> even though in the Spark UI it says it wrote some positive number of rows.
>>>> Eventually traced it back to our update to use the
>>>> ObserverReadProxyProvider in our notebook sessions. I finally figured out
>>>> it was due to the "Maintaining Client Consistency" section talked about in
>>>> https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html
>>>> <https://urldefense.com/v3/__https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html__;!!IKRxdwAv5BmarQ!PNuufr0_QHextDKhP3kMaqTc2HJjvEcE50VXE4ah12vXr5C-1nIBGhBid87xBgQ$>
>>>> .
>>>>
>>>> After setting the auto msync period to a low value, the writes started
>>>> working. I kept digging in and realized it's due to how the
>>>> FileOutputCommitter v1 algorithm works. During the commitJob phase, the
>>>> AM/driver does a file system listing on the output directory to find all
>>>> the finished task output files it needs to move to the top level output
>>>> directory. But since this is a read, the observer can serve this request,
>>>> but it can be out of date and not see the newly written files that just
>>>> finished from the executors. The auto msync fixed it because it forced the
>>>> driver to do an msync before the read took place. However, frequent auto
>>>> msyncs can defeat some of the performance benefits of the Observer.
>>>>
>>>> The v2 algorithm shouldn't have this issue because the tasks themselves
>>>> copy the output to the final directory when they finish, and the driver
>>>> simply adds the _SUCCESS at the end. And Hadoop's default is v2, but Spark
>>>> overrides that to use v1 by default because of potential correctness
>>>> issues, which is fair. While this is mostly an issue with Hadoop, the fact
>>>> that Spark defaults to the v1 algorithm makes it somewhat of a Spark
>>>> problem. Also, things like Delta Lake (or even regular structured streaming
>>>> output I think) shouldn't have issues because they are direct write with
>>>> transaction log based, so no file moving on the driver involved.
>>>>
>>>> So I mostly wanted to share that in case anyone else runs into this
>>>> same issue. But also wanted to get thoughts on if anything should be done
>>>> about this to prevent it from happening. Several ideas in no particular
>>>> order:
>>>>
>>>> - Perform an msync during Spark's commitJob before calling the parent
>>>> commitJob. Since this is only available in newer APIs, probably isn't even
>>>> possible while maintaining compatibility with older Hadoop versions.
>>>> - Attempt to get an msync added upstream in Hadoop's v1 committer's
>>>> commitJob
>>>> - Attempt to detect the use of the ObserverReadProxyProvider and either
>>>> force using v2 committer on the spark side or just print out a warning that
>>>> you either need to use the v2 committer or you need to set the auto msync
>>>> period very low or 0 to guarantee correct output.
>>>> - Simply add something to the Spark docs somewhere about things to know
>>>> when using the ObserverReadProxyProvider
>>>> - Assume that if you are capable of creating your own Hadoop cluster
>>>> with an Observer Namenode you will recognize this limitation quickly, which
>>>> it only took me about an hour to figure out so that's also fair
>>>>
>>>> Thanks,
>>>>
>>>> --
>>>> Adam
>>>>
>>>
>>
>> --
>> Adam Binford
>>
>

Reply via email to