deniskuzZ commented on code in PR #4899:
URL: https://github.com/apache/hive/pull/4899#discussion_r2321658402
##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java:
##########
@@ -293,6 +293,13 @@ protected void initializeAndRunProcessor(Map<String,
LogicalInput> inputs,
rproc.run();
perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
+
+ // Try to call canCommit to AM. If there is no other speculative attempt
execute canCommit, then continue.
+ // If there are other speculative attempt execute canCommit first, then
wait until the attempt is killed
+ // or the committed task fails.
+ while (!this.processorContext.canCommit()) {
Review Comment:
[TaskImpl::canCommit](https://github.com/apache/tez/blob/6683866ea5e9a7b900ccbcd617e49588974019ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java#L694)
is already wrapped with `writeLock` - OK
however, if commitAttempt != taskAttemptID (speculative attempt was already
committed), how are we planning to leave the loop (`while
(!this.processorContext.canCommit()) {Thread.sleep(100);}`)?
````
LOG.debug("{} is current committer. Commit waiting for: {}", commitAttempt,
taskAttemptID);
return false;
````
##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java:
##########
@@ -293,6 +293,13 @@ protected void initializeAndRunProcessor(Map<String,
LogicalInput> inputs,
rproc.run();
perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
+
+ // Try to call canCommit to AM. If there is no other speculative attempt
execute canCommit, then continue.
+ // If there are other speculative attempt execute canCommit first, then
wait until the attempt is killed
+ // or the committed task fails.
+ while (!this.processorContext.canCommit()) {
Review Comment:
[TaskImpl::canCommit](https://github.com/apache/tez/blob/6683866ea5e9a7b900ccbcd617e49588974019ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java#L694)
is already wrapped with `writeLock` - OK
however, if commitAttempt != taskAttemptID (speculative attempt was already
committed), how are we planning to leave the loop `while
(!this.processorContext.canCommit()) {Thread.sleep(100);}`?
````
LOG.debug("{} is current committer. Commit waiting for: {}", commitAttempt,
taskAttemptID);
return false;
````
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]