[ 
https://issues.apache.org/jira/browse/HIVE-27899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17789037#comment-17789037
 ] 

Chenyu Zheng edited comment on HIVE-27899 at 11/23/23 11:16 AM:
----------------------------------------------------------------

[~glapark] 

_Note: Sorry for I changed my viewpoint, please see the latest comment._

Thanks for you reply! I just implement a draft path in our cluster. I add 
canCommit after "TezProcessor::initializeAndRunProcessor ->rproc.run();". And 
indeed solve my problem.

Let's reanalyze the problem. The below picture is the related function call, 
and I omit some function call.

!image-2023-11-23-16-21-20-244.png|width=848,height=315!
The serial numbers in the figure represent code segments, and we assume that 
exceptions may be thrown in these code segments.
Case A:
If an exception is thrown in (1), (2), there will be no problem.
Case B:
If an exception is thrown in (3), (4), it will not have any impact. Because 
even if we call canCommit to AM successfully, AM will set 
TaskImpl::commitAttempt to null because the task failed.
Case C:

It seems a problem, but not.
If exceptions are thrown in (5), (6), (7). At this time, the first attempt has 
called canCommit and fsp.commit, but the first attempt fail without sending a 
success signal to AM. At this time, another task attempt will wait in a loop 
for canCommit to return until AM learns that the first attempt has failed and 
resets TaskImpl::commitAttempt to null.
Because at least one attempt must enter the successful state, the hive driver 
will continue to execute removeTempOrDuplicateFiles, the error process in the 
first attempt will not occur between getNonEmptySubDirs and 
removeTempOrDuplicateFilesNonMm.Then removeTempOrDuplicateFiles will delete the 
file in the first task attempt. So, there won't be any problem.
Case D:
If an exception is thrown at (8), there will be no problem.

 

So I think canCommit will solve this problem.

 

what about your suggestion? expect you reply!


was (Author: zhengchenyu):
[~glapark] 

Thanks for you reply! I just implement a draft path in our cluster. I add 
canCommit after "TezProcessor::initializeAndRunProcessor ->rproc.run();". And 
indeed solve my problem.

But I think you are right! Let's reanalyze the problem. The below picture is 
the related function call, and I omit some function call.

!image-2023-11-23-16-21-20-244.png|width=848,height=315!
The serial numbers in the figure represent code segments, and we assume that 
exceptions may be thrown in these code segments.
Case A:
If an exception is thrown in (1), (2), there will be no problem.
Case B:
If an exception is thrown in (3), (4), it will not have any impact. Because 
even if we call canCommit to AM successfully, AM will set 
TaskImpl::commitAttempt to null because the task failed.
Case C:
If exceptions are thrown in (5), (6), (7), big problems may occur. Because the 
file has been renamed from the tmp file to final file at this time. However, if 
the task attempt has not notified AM that it has succeeded, another task 
attempt will be started, resulting in duplicate files.
Case D:
If an exception is thrown at (8), there will be no problem.
 
 
I have two way:
Solution A: 
Add a final commit operation in TaskImpl::AttemptSucceededTransition. 
Note: In fact, I don't like to add remote filesystem operation in state 
machine, maybe stuck the dispatcher.

Solution B: 
If file is committed, but the task is not succeeded. When the next attempt 
commit the final file, firstly delete the file generated by before attempt, 
then rename to final file.
What inspires me is hadoop mr. I added code to randomly exit the program in 
commitTask, but there are no problem. MR solves the problem in this way. 
The MR's final file named like 'part-r-00000'. The final file for all task 
attempt is same in same task, so The design is easy to implement.
For now, the hive final file names as "000000_0", is composed of taskid and 
taskattemptid. we need rename it as mr's way.

what about your suggestion? expect you reply!

> Killed speculative execution task attempt should not commit file
> ----------------------------------------------------------------
>
>                 Key: HIVE-27899
>                 URL: https://issues.apache.org/jira/browse/HIVE-27899
>             Project: Hive
>          Issue Type: Bug
>          Components: Tez
>            Reporter: Chenyu Zheng
>            Assignee: Chenyu Zheng
>            Priority: Major
>         Attachments: image-2023-11-23-16-21-20-244.png, reproduce_bug.md
>
>
> As I mentioned in HIVE-25561, when tez turns on speculative execution, the 
> data file produced by hive may be duplicated. I mentioned in HIVE-25561 that 
> if the speculatively executed task is killed, some data may be submitted 
> unexpectedly. However, after HIVE-25561, there is still a situation that has 
> not been solved. If two task attempts commit file at the same time, the 
> problem of duplicate data files may also occur. Although the probability of 
> this happening is very, very low, it does happen.
>  
> Why?
> There are two key steps:
> (1)FileSinkOperator::closeOp
> TezProcessor::initializeAndRunProcessor --> ... --> FileSinkOperator::closeOp 
> --> fsp.commit
> When the OP is closed, the process of closing the OP will be triggered, and 
> eventually the call to fsp.commit will be triggered.
> (2)removeTempOrDuplicateFiles
> (2.a)Firstly, listStatus the files in the temporary directory. 
> (2.b)Secondly check whether there are multiple incorrect commit, and finally 
> move the correct results to the final directory.
> When speculative execution is enabled, when one attempt of a Task is 
> completed, other attempts will be killed. However, AM only sends the kill 
> event and does not ensure that all cleanup actions are completed, that is, 
> closeOp may be executed between 2.a and 2.b. Therefore, 
> removeTempOrDuplicateFiles will not delete the file generated by the kill 
> attempt.
> How?
> The problem is that both speculatively executed tasks commit the file. This 
> will not happen in the Tez examples because they will try canCommit, which 
> can guarantee that one and only one task attempt commit successfully. If one 
> task attempt executes canCommti successfully, the other one will be stuck by 
> canCommit until it receives a kill signal.
> detail see: 
> [https://github.com/apache/tez/blob/51d6f53967110e2b91b6d90b46f8e16bdc062091/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java#L70]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to