EnricoMi commented on code in PR #1418:
URL:
https://github.com/apache/incubator-uniffle/pull/1418#discussion_r1570282967
##########
common/src/main/java/org/apache/uniffle/common/util/BlockIdLayout.java:
##########
@@ -143,7 +143,7 @@ public int hashCode() {
return Objects.hash(sequenceNoBits, partitionIdBits, taskAttemptIdBits);
}
- public long getBlockId(int sequenceNo, int partitionId, long taskAttemptId) {
+ public long getBlockId(int sequenceNo, int partitionId, int taskAttemptId) {
Review Comment:
I think this `long` -> `int` change here should be reverted, because here we
check the original long task attempt id is within block id layout constraints.
Only task attempt ids used after the block id accepted them are reduced to int.
##########
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java:
##########
@@ -227,28 +252,12 @@ public static String getString(Configuration rssJobConf,
String key, String defa
return rssJobConf.get(key, defaultValue);
}
- public static long getBlockId(int partitionId, long taskAttemptId, int
nextSeqNo) {
- long attemptId = taskAttemptId >> (LAYOUT.partitionIdBits +
LAYOUT.taskAttemptIdBits);
- if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
- throw new RssException(
- "Can't support attemptId [" + attemptId + "], the max value should
be " + MAX_ATTEMPT_ID);
- }
- if (nextSeqNo < 0 || nextSeqNo > MAX_SEQUENCE_NO) {
- throw new RssException(
- "Can't support sequence [" + nextSeqNo + "], the max value should be
" + MAX_SEQUENCE_NO);
- }
-
- int atomicInt = (int) ((nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId);
- long taskId =
- taskAttemptId - (attemptId << (LAYOUT.partitionIdBits +
LAYOUT.taskAttemptIdBits));
-
- return LAYOUT.getBlockId(atomicInt, partitionId, taskId);
+ public static long getBlockId(int partitionId, int taskAttemptId, int
nextSeqNo) {
Review Comment:
Technically, the `taskAttemptId` can be `long` here as this *before* block
id layout checks the bit size constraint (though we feed this method only with
`int taskAttemptId`s produced by `RssMRUtils.createRssTaskAttemptId()`):
```suggestion
public static long getBlockId(int partitionId, long taskAttemptId, int
nextSeqNo) {
```
##########
client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java:
##########
@@ -158,33 +156,12 @@ public static String uniqueIdentifierToAttemptId(String
uniqueIdentifier) {
return StringUtils.join(ids, "_", 0, 7);
}
- public static long getBlockId(int partitionId, long taskAttemptId, int
nextSeqNo) {
- LOG.info(
- "GetBlockId, partitionId:{}, taskAttemptId:{}, nextSeqNo:{}",
- partitionId,
- taskAttemptId,
- nextSeqNo);
- long attemptId = taskAttemptId >> (LAYOUT.partitionIdBits +
LAYOUT.taskAttemptIdBits);
- if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
- throw new RssException(
- "Can't support attemptId [" + attemptId + "], the max value should
be " + MAX_ATTEMPT_ID);
- }
- if (nextSeqNo < 0 || nextSeqNo > MAX_SEQUENCE_NO) {
- throw new RssException(
- "Can't support sequence [" + nextSeqNo + "], the max value should be
" + MAX_SEQUENCE_NO);
- }
-
- int atomicInt = (int) ((nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId);
- long taskId =
- taskAttemptId - (attemptId << (LAYOUT.partitionIdBits +
LAYOUT.taskAttemptIdBits));
-
- return LAYOUT.getBlockId(atomicInt, partitionId, taskId);
+ public static long getBlockId(int partitionId, int taskAttemptId, int
nextSeqNo) {
+ return LAYOUT.getBlockId(nextSeqNo, partitionId, taskAttemptId);
}
public static long getTaskAttemptId(long blockId) {
Review Comment:
This task attempt id is derived from the block id, hence it is reduced in
its bit size:
```suggestion
public static int getTaskAttemptId(long blockId) {
```
The caller of this method can continue to upcast the returned `int` to
`long`, no problem.
##########
client-mr/core/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java:
##########
@@ -64,7 +64,7 @@ public class SortWriteBufferManager<K, V> {
private final Counters.Counter mapOutputRecordCounter;
private long uncompressedDataLen = 0;
private long compressTime = 0;
- private final long taskAttemptId;
+ private final int taskAttemptId;
Review Comment:
I am not sure about restricting `taskAttemptIds` to `int` in such places.
Here is the situation:
1. Spark, Tez and MR provide us with `long` task attempt ids (for Tez and
MR, (taskId, attemptId) constitutes a `long` task attempt id, which we restrict
to `int` for similar reasons as in 2.)
2. for the purpose of the block id, we limit those `long` task attempt ids
to `int`, since we allow only less that 32 bits for it
3. the task attempt id retrieved from the block id is `int` because of that
4. still, all other places could continue to work with `long` task attempt
ids if that makes no difference for that code, up-casting `int` task attempt
ids to `long` does not harm, as long as the code works with `long`.
This allows to support truly `long` task attempt ids without reverting such
code changes in the future.
@zuston @jerqi @zhengchenyu what do you think?
##########
client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java:
##########
@@ -158,33 +156,12 @@ public static String uniqueIdentifierToAttemptId(String
uniqueIdentifier) {
return StringUtils.join(ids, "_", 0, 7);
}
- public static long getBlockId(int partitionId, long taskAttemptId, int
nextSeqNo) {
- LOG.info(
- "GetBlockId, partitionId:{}, taskAttemptId:{}, nextSeqNo:{}",
- partitionId,
- taskAttemptId,
- nextSeqNo);
- long attemptId = taskAttemptId >> (LAYOUT.partitionIdBits +
LAYOUT.taskAttemptIdBits);
- if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
- throw new RssException(
- "Can't support attemptId [" + attemptId + "], the max value should
be " + MAX_ATTEMPT_ID);
- }
- if (nextSeqNo < 0 || nextSeqNo > MAX_SEQUENCE_NO) {
- throw new RssException(
- "Can't support sequence [" + nextSeqNo + "], the max value should be
" + MAX_SEQUENCE_NO);
- }
-
- int atomicInt = (int) ((nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId);
- long taskId =
- taskAttemptId - (attemptId << (LAYOUT.partitionIdBits +
LAYOUT.taskAttemptIdBits));
-
- return LAYOUT.getBlockId(atomicInt, partitionId, taskId);
+ public static long getBlockId(int partitionId, int taskAttemptId, int
nextSeqNo) {
Review Comment:
Technically, the `taskAttemptId` can be `long` here as this *before* block
id layout checks the bit size constraint (though we feed this method only with
`int taskAttemptId`s produced by `RssTezUtils.createRssTaskAttemptId()`):
```suggestion
public static long getBlockId(int partitionId, long taskAttemptId, int
nextSeqNo) {
```
##########
common/src/main/java/org/apache/uniffle/common/util/BlockIdLayout.java:
##########
@@ -185,13 +185,13 @@ public BlockId asBlockId(long blockId) {
blockId, this, getSequenceNo(blockId), getPartitionId(blockId),
getTaskAttemptId(blockId));
}
- public BlockId asBlockId(int sequenceNo, int partitionId, long
taskAttemptId) {
Review Comment:
same here
##########
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java:
##########
@@ -227,28 +252,12 @@ public static String getString(Configuration rssJobConf,
String key, String defa
return rssJobConf.get(key, defaultValue);
}
- public static long getBlockId(int partitionId, long taskAttemptId, int
nextSeqNo) {
- long attemptId = taskAttemptId >> (LAYOUT.partitionIdBits +
LAYOUT.taskAttemptIdBits);
- if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
- throw new RssException(
- "Can't support attemptId [" + attemptId + "], the max value should
be " + MAX_ATTEMPT_ID);
- }
- if (nextSeqNo < 0 || nextSeqNo > MAX_SEQUENCE_NO) {
- throw new RssException(
- "Can't support sequence [" + nextSeqNo + "], the max value should be
" + MAX_SEQUENCE_NO);
- }
-
- int atomicInt = (int) ((nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId);
- long taskId =
- taskAttemptId - (attemptId << (LAYOUT.partitionIdBits +
LAYOUT.taskAttemptIdBits));
-
- return LAYOUT.getBlockId(atomicInt, partitionId, taskId);
+ public static long getBlockId(int partitionId, int taskAttemptId, int
nextSeqNo) {
+ return LAYOUT.getBlockId(nextSeqNo, partitionId, taskAttemptId);
}
public static long getTaskAttemptId(long blockId) {
Review Comment:
This task attempt id is derived from the block id, hence it is reduced in
its bit size:
```suggestion
public static int getTaskAttemptId(long blockId) {
```
The caller of this method can continue to upcast the returned `int` to
`long`, no problem.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]