This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 16dedb9a [#992] fix(tez): convertTaskAttemptIdToLong should not 
consider appattemptId (#1007)
16dedb9a is described below

commit 16dedb9a2a862df0065fdd87c5724ca44edef8d9
Author: zhengchenyu <[email protected]>
AuthorDate: Mon Jul 17 15:05:27 2023 +0800

    [#992] fix(tez): convertTaskAttemptIdToLong should not consider 
appattemptId (#1007)
    
    ### What changes were proposed in this pull request?
    
    When we convert task attempt id to long, consider real taskAttemptID = 
taskAttemptID - (appAttemptId - 1) * 1000, the logical only apply to mr. tez 
should remove this.
    
    If appAttemptId is 2, taskattemptid will be normalized to negative value, 
it is unexpected.
    
    Fix: #992
---
 .../src/main/java/org/apache/tez/common/RssTezUtils.java | 16 ++++------------
 .../tez/runtime/library/common/sort/impl/RssSorter.java  |  4 +---
 .../runtime/library/common/sort/impl/RssUnSorter.java    |  4 +---
 .../test/java/org/apache/tez/common/RssTezUtilsTest.java |  8 ++++----
 4 files changed, 10 insertions(+), 22 deletions(-)

diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java 
b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
index c507d7e2..c44080cb 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
@@ -287,23 +287,15 @@ public class RssTezUtils {
     }
   }
 
-  public static long convertTaskAttemptIdToLong(TezTaskAttemptID 
taskAttemptID, int appAttemptId) {
+  public static long convertTaskAttemptIdToLong(TezTaskAttemptID 
taskAttemptID) {
     long lowBytes = taskAttemptID.getTaskID().getId();
     if (lowBytes > Constants.MAX_TASK_ATTEMPT_ID) {
       throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " + 
lowBytes + " exceed");
     }
-    if (appAttemptId < 1) {
-      throw new RssException("appAttemptId  " + appAttemptId + " is wrong");
-    }
-    long highBytes = (long) taskAttemptID.getId() - (appAttemptId - 1) * 1000;
+    long highBytes = taskAttemptID.getId();
     if (highBytes > MAX_ATTEMPT_ID || highBytes < 0) {
       throw new RssException(
-          "TaskAttempt "
-              + taskAttemptID
-              + " high bytes "
-              + highBytes
-              + " exceed, appAttemptId:"
-              + appAttemptId);
+          "TaskAttempt " + taskAttemptID + " high bytes " + highBytes + " 
exceed.");
     }
     long id =
         (highBytes << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + 
Constants.PARTITION_ID_MAX_LENGTH))
@@ -323,7 +315,7 @@ public class RssTezUtils {
     for (InputAttemptIdentifier inputAttemptIdentifier : 
successMapTaskAttempts) {
       String pathComponent = inputAttemptIdentifier.getPathComponent();
       TezTaskAttemptID mapTaskAttemptID = 
IdUtils.convertTezTaskAttemptID(pathComponent);
-      long rssTaskId = 
RssTezUtils.convertTaskAttemptIdToLong(mapTaskAttemptID, appAttemptId);
+      long rssTaskId = 
RssTezUtils.convertTaskAttemptIdToLong(mapTaskAttemptID);
       long mapTaskId = mapTaskAttemptID.getTaskID().getId();
 
       LOG.info(
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
index 20750c54..8e8aed3d 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.common.IdUtils;
 import org.apache.tez.common.RssTezConfig;
 import org.apache.tez.common.RssTezUtils;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -84,8 +83,7 @@ public class RssSorter extends ExternalSorter {
         conf.getDouble(
             RssTezConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD,
             RssTezConfig.RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD);
-    long taskAttemptId =
-        RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptID, 
IdUtils.getAppAttemptId());
+    long taskAttemptId = 
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptID);
 
     long maxSegmentSize =
         conf.getLong(
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
index 8c53cd1e..c955bd24 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.common.IdUtils;
 import org.apache.tez.common.RssTezConfig;
 import org.apache.tez.common.RssTezUtils;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -83,8 +82,7 @@ public class RssUnSorter extends ExternalSorter {
         conf.getDouble(
             RssTezConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD,
             RssTezConfig.RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD);
-    long taskAttemptId =
-        RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptID, 
IdUtils.getAppAttemptId());
+    long taskAttemptId = 
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptID);
     long maxSegmentSize =
         conf.getLong(
             RssTezConfig.RSS_CLIENT_MAX_BUFFER_SIZE,
diff --git 
a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java 
b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
index 947d4d8e..3aa36bd5 100644
--- a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
+++ b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
@@ -55,7 +55,7 @@ public class RssTezUtilsTest {
 
     boolean isException = false;
     try {
-      RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
+      RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId);
     } catch (RssException e) {
       isException = true;
     }
@@ -65,7 +65,7 @@ public class RssTezUtilsTest {
     tezTaskAttemptId = TezTaskAttemptID.getInstance(taskId, 2);
     isException = false;
     try {
-      RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
+      RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId);
     } catch (RssException e) {
       isException = true;
     }
@@ -79,7 +79,7 @@ public class RssTezUtilsTest {
     TezVertexID vId = TezVertexID.getInstance(dagId, 35);
     TezTaskID tId = TezTaskID.getInstance(vId, 389);
     TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tId, 2);
-    long taskAttemptId = 
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
+    long taskAttemptId = 
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId);
     long blockId = RssTezUtils.getBlockId(1, taskAttemptId, 0);
     long newTaskAttemptId = RssTezUtils.getTaskAttemptId(blockId);
     assertEquals(taskAttemptId, newTaskAttemptId);
@@ -95,7 +95,7 @@ public class RssTezUtilsTest {
     TezVertexID vId = TezVertexID.getInstance(dagId, 35);
     TezTaskID tId = TezTaskID.getInstance(vId, 389);
     TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tId, 2);
-    long taskAttemptId = 
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
+    long taskAttemptId = 
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId);
     long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
     for (int partitionId = 0; partitionId <= 3000; partitionId++) {
       for (int seqNo = 0; seqNo <= 10; seqNo++) {

Reply via email to