Thanks Steve to answer in detail. I was under same feeling with Chandan
from the line as well: it was against my knowledge as rename operation
itself in HDFS is atomic, and I didn't imagine it was for tackling object
store.

I learned a lot for object store from your answer. Thanks again.

Jungtaek Lim (HeartSaVioR)

2018년 10월 3일 (수) 오전 2:48, chandan prakash <chandanbaran...@gmail.com>님이 작성:

> Thanks a lot Steve and Jungtaek for your answers.
> Steve,
> You explained really well in depth.
>
>  I understood that the existing old implementation was not correct for
> object store like S3. The new implementation will address that. And for
> better performance we should better choose a Direct Write based checkpoint
> rather than Rename based (which we can implement using the new
> CheckpointFilemanager abstraction)
> My confusion was because of this line in PR:
> *This is incorrect as rename is not atomic in HDFS FileSystem
> implementation*
> I thought the above line meant that existing old implementation is not
> correct for HDFS file system as well .
> So wanted to understand if there is something I am missing . The new
> implementation is for addressing issue of Object Store like S3 and nor HDFS.
> Thanks again for your explanation, I am sure it will help a lot of other
> code readers as well .
>
> Regards,
> Chandan
>
>
>
> On Mon, Oct 1, 2018 at 5:37 PM Steve Loughran <ste...@hortonworks.com>
> wrote:
>
>>
>>
>> On 11 Aug 2018, at 17:33, chandan prakash <chandanbaran...@gmail.com>
>> wrote:
>>
>> Hi All,
>> I was going through this pull request about new CheckpointFileManager
>> abstraction in structured streaming coming in 2.4 :
>> https://issues.apache.org/jira/browse/SPARK-23966
>> https://github.com/apache/spark/pull/21048
>>
>> I went through the code in detail and found it will indtroduce a very
>> nice abstraction which is much cleaner and extensible for Direct Writes
>> File System like S3 (in addition to current HDFS file system).
>>
>> *But I am unable to understand, is it really solving some problem in
>> exsisting State Store code which is currently  existing in Spark 2.3 ? *
>>
>> *My questions related to above statements in State Store code : *
>>  *PR description*:: "Checkpoint files must be written atomically such
>> that *no partial files are generated*.
>> *QUESTION*: When are partial files generated in current code ?  I can
>> see that data is first written to temp-delta file and then renamed to
>> version.delta file. If something bad happens, the task will fail due to
>> thrown exception and abort() will be called on store to close and delete
>> tempDeltaFileStream . I think it is quite clean, what is the case that
>> partial files might be generated ?
>>
>>
>> I suspect the issue is that as files are written to a "classic" Posix
>> store, flush/sync operations can result in the intermediate data being
>> visible to others. Which is why the convention for checkpointing/commit
>> operations is : write to temp & rename. Which is not what you want for
>> object stores, especially S3
>>
>>
>>
>>  *PR description*:: *State Store behavior is incorrect - HDFS FileSystem
>> implementation does not have atomic rename*"
>> *QUESTION*:  Hdfs filesystem rename operation is atomic, I think above
>> line takes into account about checking existing file if exists and then
>> taking appropriate action which together makes the file renaming operation
>> multi-steps and hence non-atomic. But why this behaviour is incorrect ?
>> Even if multiple executors try to write to the same version.delta file,
>> only 1st of them will succeed, the second one will see the file exists and
>> will delete its temp-delta file. Looks good .
>>
>>
>> HDFS single file and dir rename is atomic; it grabs a lock on the
>> metadatastore, does the change, unlocks it. If you are doing any FS op
>> which explicitly renames more than one file in your commit, you lose
>> atomicity.  If there's a check + rename then yes, it's two step, unless you
>> can use create(path, overwrite=false) to create some lease file where you
>> know that the creation is exclusive & atomic for HDFS + Posix, generally
>> not-at-all for the stores, especially S3 which can actually cache the 404
>> in its load balancers for a few tens of milliseconds
>>
>> For object stores, you are in different world of pain
>>
>> S3: nope; O(files+ data)  + observable + partial failures. List
>> inconsistency + caching of negative GET/HEAD to defend against DoS
>> wasb: no, except for bits of the tree where you enable leases, something
>> which increases cost of operations. O(files), with the odd pause if some
>> shard movement has to take place
>> google GCS: not sure, but it is O(files)
>> Azure abfs. Not atomic yet As the code says:
>>
>>     if (isAtomicRenameKey(source.getName())) {
>>       LOG.warn("The atomic rename feature is not supported by the ABFS
>> scheme; however rename,"
>>               +" create and delete operations are atomic if Namespace is
>> enabled for your Azure Storage account.");
>>     }
>>
>> From my reading of the SPARK-23966 PR, it's the object store problem
>> which is being addressed -both correctness and performance.
>>
>>
>> Anything I am missing here?
>> Really curious to know which corner cases we are trying to solve by this
>> new pull request ?
>>
>>
>>
>> Object stores as the back end. For S3 in particular, where that rename is
>> O(data) and a direct PUT to the destination gives you that atomic ness.
>>
>>
>> Someone needs to sit down and write that reference implementation.
>>
>> Whoever  does want to do that,
>>
>> - I believe it can all be done with the normal Hadoop FS APIs, simply
>> knowing that for the store that OutputStream.close() is (a) atomic, (b)
>> potentially really slow as the remaining data gets uploaded and (c) when it
>> fails, can mean all your data just got lost.
>> - I've got the TLA+ spec for the S3 API which they can use as the
>> foundation for their proofs of correctness
>> https://issues.apache.org/jira/secure/attachment/12865161/objectstore.pdf
>>
>>
>>
>> -Steve
>>
>
>
> --
> Chandan Prakash
>
>

Reply via email to