This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 16dedb9a [#992] fix(tez): convertTaskAttemptIdToLong should not
consider appattemptId (#1007)
16dedb9a is described below
commit 16dedb9a2a862df0065fdd87c5724ca44edef8d9
Author: zhengchenyu <[email protected]>
AuthorDate: Mon Jul 17 15:05:27 2023 +0800
[#992] fix(tez): convertTaskAttemptIdToLong should not consider
appattemptId (#1007)
### What changes were proposed in this pull request?
When we convert task attempt id to long, consider real taskAttemptID =
taskAttemptID - (appAttemptId - 1) * 1000, the logical only apply to mr. tez
should remove this.
If appAttemptId is 2, taskattemptid will be normalized to negative value,
it is unexpected.
Fix: #992
---
.../src/main/java/org/apache/tez/common/RssTezUtils.java | 16 ++++------------
.../tez/runtime/library/common/sort/impl/RssSorter.java | 4 +---
.../runtime/library/common/sort/impl/RssUnSorter.java | 4 +---
.../test/java/org/apache/tez/common/RssTezUtilsTest.java | 8 ++++----
4 files changed, 10 insertions(+), 22 deletions(-)
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
index c507d7e2..c44080cb 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
@@ -287,23 +287,15 @@ public class RssTezUtils {
}
}
- public static long convertTaskAttemptIdToLong(TezTaskAttemptID
taskAttemptID, int appAttemptId) {
+ public static long convertTaskAttemptIdToLong(TezTaskAttemptID
taskAttemptID) {
long lowBytes = taskAttemptID.getTaskID().getId();
if (lowBytes > Constants.MAX_TASK_ATTEMPT_ID) {
throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " +
lowBytes + " exceed");
}
- if (appAttemptId < 1) {
- throw new RssException("appAttemptId " + appAttemptId + " is wrong");
- }
- long highBytes = (long) taskAttemptID.getId() - (appAttemptId - 1) * 1000;
+ long highBytes = taskAttemptID.getId();
if (highBytes > MAX_ATTEMPT_ID || highBytes < 0) {
throw new RssException(
- "TaskAttempt "
- + taskAttemptID
- + " high bytes "
- + highBytes
- + " exceed, appAttemptId:"
- + appAttemptId);
+ "TaskAttempt " + taskAttemptID + " high bytes " + highBytes + "
exceed.");
}
long id =
(highBytes << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH +
Constants.PARTITION_ID_MAX_LENGTH))
@@ -323,7 +315,7 @@ public class RssTezUtils {
for (InputAttemptIdentifier inputAttemptIdentifier :
successMapTaskAttempts) {
String pathComponent = inputAttemptIdentifier.getPathComponent();
TezTaskAttemptID mapTaskAttemptID =
IdUtils.convertTezTaskAttemptID(pathComponent);
- long rssTaskId =
RssTezUtils.convertTaskAttemptIdToLong(mapTaskAttemptID, appAttemptId);
+ long rssTaskId =
RssTezUtils.convertTaskAttemptIdToLong(mapTaskAttemptID);
long mapTaskId = mapTaskAttemptID.getTaskID().getId();
LOG.info(
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
index 20750c54..8e8aed3d 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.common.IdUtils;
import org.apache.tez.common.RssTezConfig;
import org.apache.tez.common.RssTezUtils;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -84,8 +83,7 @@ public class RssSorter extends ExternalSorter {
conf.getDouble(
RssTezConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD,
RssTezConfig.RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD);
- long taskAttemptId =
- RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptID,
IdUtils.getAppAttemptId());
+ long taskAttemptId =
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptID);
long maxSegmentSize =
conf.getLong(
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
index 8c53cd1e..c955bd24 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.common.IdUtils;
import org.apache.tez.common.RssTezConfig;
import org.apache.tez.common.RssTezUtils;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -83,8 +82,7 @@ public class RssUnSorter extends ExternalSorter {
conf.getDouble(
RssTezConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD,
RssTezConfig.RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD);
- long taskAttemptId =
- RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptID,
IdUtils.getAppAttemptId());
+ long taskAttemptId =
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptID);
long maxSegmentSize =
conf.getLong(
RssTezConfig.RSS_CLIENT_MAX_BUFFER_SIZE,
diff --git
a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
index 947d4d8e..3aa36bd5 100644
--- a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
+++ b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
@@ -55,7 +55,7 @@ public class RssTezUtilsTest {
boolean isException = false;
try {
- RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
+ RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId);
} catch (RssException e) {
isException = true;
}
@@ -65,7 +65,7 @@ public class RssTezUtilsTest {
tezTaskAttemptId = TezTaskAttemptID.getInstance(taskId, 2);
isException = false;
try {
- RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
+ RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId);
} catch (RssException e) {
isException = true;
}
@@ -79,7 +79,7 @@ public class RssTezUtilsTest {
TezVertexID vId = TezVertexID.getInstance(dagId, 35);
TezTaskID tId = TezTaskID.getInstance(vId, 389);
TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tId, 2);
- long taskAttemptId =
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
+ long taskAttemptId =
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId);
long blockId = RssTezUtils.getBlockId(1, taskAttemptId, 0);
long newTaskAttemptId = RssTezUtils.getTaskAttemptId(blockId);
assertEquals(taskAttemptId, newTaskAttemptId);
@@ -95,7 +95,7 @@ public class RssTezUtilsTest {
TezVertexID vId = TezVertexID.getInstance(dagId, 35);
TezTaskID tId = TezTaskID.getInstance(vId, 389);
TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tId, 2);
- long taskAttemptId =
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
+ long taskAttemptId =
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId);
long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
for (int partitionId = 0; partitionId <= 3000; partitionId++) {
for (int seqNo = 0; seqNo <= 10; seqNo++) {