EnricoMi commented on code in PR #1418:
URL: 
https://github.com/apache/incubator-uniffle/pull/1418#discussion_r1533425742


##########
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java:
##########
@@ -44,37 +44,40 @@ public class RssMRUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(RssMRUtils.class);
   private static final BlockIdLayout LAYOUT = BlockIdLayout.DEFAULT;
-  private static final int MAX_ATTEMPT_LENGTH = 6;
+  private static final int MAX_ATTEMPT_LENGTH = 4;
   private static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
-  private static final int MAX_SEQUENCE_NO =
-      (1 << (LAYOUT.sequenceNoBits - MAX_ATTEMPT_LENGTH)) - 1;
+  private static final int MAX_TASK_LENGTH = LAYOUT.taskAttemptIdBits - 
MAX_ATTEMPT_LENGTH;
+  private static final int MAX_TASK_ID = (1 << MAX_TASK_LENGTH) - 1;
 
-  // Class TaskAttemptId have two field id and mapId, rss taskAttemptID have 
21 bits,
-  // mapId is 19 bits, id is 2 bits. MR have a trick logic, taskAttemptId will 
increase
-  // 1000 * (appAttemptId - 1), so we will decrease it.
+  // Class TaskAttemptId have two field id and mapId. MR have a trick logic, 
taskAttemptId will
+  // increase 1000 * (appAttemptId - 1), so we will decrease it.
   public static long convertTaskAttemptIdToLong(TaskAttemptID taskAttemptID, 
int appAttemptId) {
-    int lowBytes = taskAttemptID.getTaskID().getId();
-    if (lowBytes > LAYOUT.maxTaskAttemptId) {
-      throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " + 
lowBytes + " exceed");
-    }
     if (appAttemptId < 1) {
       throw new RssException("appAttemptId " + appAttemptId + " is wrong");
     }
-    int highBytes = taskAttemptID.getId() - (appAttemptId - 1) * 1000;
-    if (highBytes > MAX_ATTEMPT_ID || highBytes < 0) {
+    long lowBytes = taskAttemptID.getId() - (appAttemptId - 1) * 1000L;
+    if (lowBytes > MAX_ATTEMPT_ID || lowBytes < 0) {
+      throw new RssException(
+          "TaskAttempt " + taskAttemptID + " low bytes " + lowBytes + " exceed 
" + MAX_ATTEMPT_ID);
+    }
+    long highBytes = taskAttemptID.getTaskID().getId();
+    if (highBytes > MAX_TASK_ID || highBytes < 0) {
       throw new RssException(
-          "TaskAttempt " + taskAttemptID + " high bytes " + highBytes + " 
exceed");
+          "TaskAttempt " + taskAttemptID + " high bytes " + highBytes + " 
exceed " + MAX_TASK_ID);
     }
-    return LAYOUT.getBlockId(highBytes, 0, lowBytes);
+    long taskAttemptId = (highBytes << (MAX_ATTEMPT_LENGTH)) + lowBytes;
+    return LAYOUT.getBlockId(0, 0, taskAttemptId);
   }
 
   public static TaskAttemptID createMRTaskAttemptId(
       JobID jobID, TaskType taskType, long rssTaskAttemptId, int appAttemptId) 
{
     if (appAttemptId < 1) {
       throw new RssException("appAttemptId " + appAttemptId + " is wrong");
     }
-    TaskID taskID = new TaskID(jobID, taskType, 
LAYOUT.getTaskAttemptId(rssTaskAttemptId));
-    int id = LAYOUT.getSequenceNo(rssTaskAttemptId) + 1000 * (appAttemptId - 
1);
+    int task = LAYOUT.getTaskAttemptId(rssTaskAttemptId) >> MAX_ATTEMPT_LENGTH;

Review Comment:
   Given `rssTaskAttemptId` is not a blockId, it is already the high and low 
bytes only, not shifted in any sense:
   ```suggestion
       int task = rssTaskAttemptId >> MAX_ATTEMPT_LENGTH;
   ```



##########
client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java:
##########
@@ -60,10 +60,10 @@ public class RssTezUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(RssTezUtils.class);
   private static final BlockIdLayout LAYOUT = BlockIdLayout.DEFAULT;
-  private static final int MAX_ATTEMPT_LENGTH = 6;
-  private static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
-  private static final int MAX_SEQUENCE_NO =
-      (1 << (LAYOUT.sequenceNoBits - MAX_ATTEMPT_LENGTH)) - 1;
+  public static final int MAX_ATTEMPT_LENGTH = 4;
+  public static final long MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;

Review Comment:
   ```suggestion
     public static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
   ```



##########
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java:
##########
@@ -228,27 +231,33 @@ public static String getString(Configuration rssJobConf, 
String key, String defa
   }
 
   public static long getBlockId(int partitionId, long taskAttemptId, int 
nextSeqNo) {

Review Comment:
   The assertions in this method are redundant, calling `LAYOUT.getBlockId` 
provides them already. All this method need to do is call
   
       return LAYOUT.getBlockId(nextSeqNo, partitionId, taskAttemptId);
   
   Same in tez client.



##########
client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java:
##########
@@ -277,16 +275,16 @@ private static int mapVertexId(String vertexName) {
   }
 
   public static long convertTaskAttemptIdToLong(TezTaskAttemptID 
taskAttemptID) {
-    int lowBytes = taskAttemptID.getTaskID().getId();
-    if (lowBytes > LAYOUT.maxTaskAttemptId) {
+    int lowBytes = taskAttemptID.getId();
+    if (lowBytes > MAX_ATTEMPT_ID || lowBytes < 0) {
       throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " + 
lowBytes + " exceed");
     }
-    int highBytes = taskAttemptID.getId();
-    if (highBytes > MAX_ATTEMPT_ID || highBytes < 0) {
+    int highBytes = taskAttemptID.getTaskID().getId();
+    if (highBytes > MAX_TASK_ID || highBytes < 0) {
       throw new RssException(
           "TaskAttempt " + taskAttemptID + " high bytes " + highBytes + " 
exceed.");
     }
-    long id = LAYOUT.getBlockId(highBytes, 0, lowBytes);
+    long id = (long) highBytes << MAX_ATTEMPT_LENGTH + lowBytes;

Review Comment:
   ```suggestion
       int id = highBytes << MAX_ATTEMPT_LENGTH + lowBytes;
   ```



##########
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java:
##########
@@ -44,37 +44,40 @@ public class RssMRUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(RssMRUtils.class);
   private static final BlockIdLayout LAYOUT = BlockIdLayout.DEFAULT;
-  private static final int MAX_ATTEMPT_LENGTH = 6;
+  private static final int MAX_ATTEMPT_LENGTH = 4;
   private static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
-  private static final int MAX_SEQUENCE_NO =
-      (1 << (LAYOUT.sequenceNoBits - MAX_ATTEMPT_LENGTH)) - 1;
+  private static final int MAX_TASK_LENGTH = LAYOUT.taskAttemptIdBits - 
MAX_ATTEMPT_LENGTH;
+  private static final int MAX_TASK_ID = (1 << MAX_TASK_LENGTH) - 1;
 
-  // Class TaskAttemptId have two field id and mapId, rss taskAttemptID have 
21 bits,
-  // mapId is 19 bits, id is 2 bits. MR have a trick logic, taskAttemptId will 
increase
-  // 1000 * (appAttemptId - 1), so we will decrease it.
+  // Class TaskAttemptId have two field id and mapId. MR have a trick logic, 
taskAttemptId will
+  // increase 1000 * (appAttemptId - 1), so we will decrease it.
   public static long convertTaskAttemptIdToLong(TaskAttemptID taskAttemptID, 
int appAttemptId) {

Review Comment:
   I'd suggest to improve the naming.
   
   We generate a taskAttemptId to be used in blockId / RSS, which should be 
called `RssTaskAttemptId`:
   
   ```suggestion
     public static int createRssTaskAttemptId(TaskAttemptID taskAttemptID, int 
appAttemptId) {
   ```
   
   The reverse operation is getting a MR / TEZ taskAttemptId from the 
`RssTaskAttemptId`:
   
   We already have the reverse method that follows that naming convention:
   
       public static TaskAttemptID createMRTaskAttemptId(...)
   
   With that naming convention, it is clear which type of task attempt id is 
meant in each of these methods.



##########
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java:
##########
@@ -44,37 +44,40 @@ public class RssMRUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(RssMRUtils.class);
   private static final BlockIdLayout LAYOUT = BlockIdLayout.DEFAULT;
-  private static final int MAX_ATTEMPT_LENGTH = 6;
+  private static final int MAX_ATTEMPT_LENGTH = 4;
   private static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
-  private static final int MAX_SEQUENCE_NO =
-      (1 << (LAYOUT.sequenceNoBits - MAX_ATTEMPT_LENGTH)) - 1;
+  private static final int MAX_TASK_LENGTH = LAYOUT.taskAttemptIdBits - 
MAX_ATTEMPT_LENGTH;
+  private static final int MAX_TASK_ID = (1 << MAX_TASK_LENGTH) - 1;
 
-  // Class TaskAttemptId have two field id and mapId, rss taskAttemptID have 
21 bits,
-  // mapId is 19 bits, id is 2 bits. MR have a trick logic, taskAttemptId will 
increase
-  // 1000 * (appAttemptId - 1), so we will decrease it.
+  // Class TaskAttemptId have two field id and mapId. MR have a trick logic, 
taskAttemptId will
+  // increase 1000 * (appAttemptId - 1), so we will decrease it.
   public static long convertTaskAttemptIdToLong(TaskAttemptID taskAttemptID, 
int appAttemptId) {
-    int lowBytes = taskAttemptID.getTaskID().getId();
-    if (lowBytes > LAYOUT.maxTaskAttemptId) {
-      throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " + 
lowBytes + " exceed");
-    }
     if (appAttemptId < 1) {
       throw new RssException("appAttemptId " + appAttemptId + " is wrong");
     }
-    int highBytes = taskAttemptID.getId() - (appAttemptId - 1) * 1000;
-    if (highBytes > MAX_ATTEMPT_ID || highBytes < 0) {
+    long lowBytes = taskAttemptID.getId() - (appAttemptId - 1) * 1000L;
+    if (lowBytes > MAX_ATTEMPT_ID || lowBytes < 0) {
+      throw new RssException(
+          "TaskAttempt " + taskAttemptID + " low bytes " + lowBytes + " exceed 
" + MAX_ATTEMPT_ID);
+    }
+    long highBytes = taskAttemptID.getTaskID().getId();
+    if (highBytes > MAX_TASK_ID || highBytes < 0) {
       throw new RssException(
-          "TaskAttempt " + taskAttemptID + " high bytes " + highBytes + " 
exceed");
+          "TaskAttempt " + taskAttemptID + " high bytes " + highBytes + " 
exceed " + MAX_TASK_ID);
     }
-    return LAYOUT.getBlockId(highBytes, 0, lowBytes);
+    long taskAttemptId = (highBytes << (MAX_ATTEMPT_LENGTH)) + lowBytes;
+    return LAYOUT.getBlockId(0, 0, taskAttemptId);

Review Comment:
   The `taskAttemptId` computed here is the result that is not shifted in any 
sense, no need to use `getBlockId` here:
   ```suggestion
       return (highBytes << (MAX_ATTEMPT_LENGTH)) + lowBytes;
   ```



##########
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java:
##########
@@ -44,37 +44,40 @@ public class RssMRUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(RssMRUtils.class);
   private static final BlockIdLayout LAYOUT = BlockIdLayout.DEFAULT;
-  private static final int MAX_ATTEMPT_LENGTH = 6;
+  private static final int MAX_ATTEMPT_LENGTH = 4;
   private static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
-  private static final int MAX_SEQUENCE_NO =
-      (1 << (LAYOUT.sequenceNoBits - MAX_ATTEMPT_LENGTH)) - 1;
+  private static final int MAX_TASK_LENGTH = LAYOUT.taskAttemptIdBits - 
MAX_ATTEMPT_LENGTH;
+  private static final int MAX_TASK_ID = (1 << MAX_TASK_LENGTH) - 1;
 
-  // Class TaskAttemptId have two field id and mapId, rss taskAttemptID have 
21 bits,
-  // mapId is 19 bits, id is 2 bits. MR have a trick logic, taskAttemptId will 
increase
-  // 1000 * (appAttemptId - 1), so we will decrease it.
+  // Class TaskAttemptId have two field id and mapId. MR have a trick logic, 
taskAttemptId will
+  // increase 1000 * (appAttemptId - 1), so we will decrease it.
   public static long convertTaskAttemptIdToLong(TaskAttemptID taskAttemptID, 
int appAttemptId) {

Review Comment:
   Given `LAYOUT.taskAttemptIdBits` is at most `31`, `MAX_TASK_LENGTH` is at 
most `27`, hence the generated tastAttemptId is at most an int.
   
   Same in tez client.



##########
client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java:
##########
@@ -277,16 +275,16 @@ private static int mapVertexId(String vertexName) {
   }
 
   public static long convertTaskAttemptIdToLong(TezTaskAttemptID 
taskAttemptID) {

Review Comment:
   ```suggestion
     public static int createRssTaskAttemptId(TezTaskAttemptID taskAttemptID) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to