EnricoMi commented on code in PR #1418:
URL:
https://github.com/apache/incubator-uniffle/pull/1418#discussion_r1542957936
##########
client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java:
##########
@@ -212,6 +213,16 @@ public List<Event> close() throws Exception {
public void start() throws Exception {
if (!isStarted.get()) {
memoryUpdateCallbackHandler.validateUpdateReceived();
+ int maxFailures =
+ conf.getInt(
+ TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
+ TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
+ boolean speculation =
+ conf.getBoolean(
+ TezConfiguration.TEZ_AM_SPECULATION_ENABLED,
+ TezConfiguration.TEZ_AM_SPECULATION_ENABLED_DEFAULT);
+ int maxAttemptNo = ClientUtils.getMaxAttemptNo(maxFailures, speculation);
+ int rsstaskAttemptId = RssTezUtils.createRssTaskAttemptId(taskAttemptId,
maxAttemptNo);
Review Comment:
This code is duplicated a few times, `RssTezUtils` should implement
`createRssTaskAttemptId(taskAttemptId, maxAttemptNo, conf)`.
##########
client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java:
##########
@@ -274,23 +252,38 @@ private static int mapVertexId(String vertexName) {
}
}
- public static long convertTaskAttemptIdToLong(TezTaskAttemptID
taskAttemptID) {
- int lowBytes = taskAttemptID.getId();
- if (lowBytes > MAX_ATTEMPT_ID || lowBytes < 0) {
- throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " +
lowBytes + " exceed");
+ public static int createRssTaskAttemptId(TezTaskAttemptID taskAttemptID, int
maxAttemptNo) {
+ int attemptBits = ClientUtils.getAttemptIdBits(maxAttemptNo);
+
+ int attemptId = taskAttemptID.getId();
+ if (attemptId > maxAttemptNo || attemptId < 0) {
+ throw new RssException(
+ "TaskAttempt " + taskAttemptID + " attemptId " + attemptId + "
exceed");
}
- int highBytes = taskAttemptID.getTaskID().getId();
- if (highBytes > MAX_TASK_ID || highBytes < 0) {
+ int taskId = taskAttemptID.getTaskID().getId();
+
+ int mapIndexBits = 32 - Integer.numberOfLeadingZeros(taskId);
Review Comment:
Same here, reuse `CLientConf.getNumberOfSignificantBits(taskId)`.
##########
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java:
##########
@@ -44,38 +45,46 @@ public class RssMRUtils {
private static final Logger LOG = LoggerFactory.getLogger(RssMRUtils.class);
private static final BlockIdLayout LAYOUT = BlockIdLayout.DEFAULT;
- private static final int MAX_ATTEMPT_LENGTH = 4;
- private static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
- private static final int MAX_TASK_LENGTH = LAYOUT.taskAttemptIdBits -
MAX_ATTEMPT_LENGTH;
- private static final int MAX_TASK_ID = (1 << MAX_TASK_LENGTH) - 1;
// Class TaskAttemptId have two field id and mapId. MR have a trick logic,
taskAttemptId will
// increase 1000 * (appAttemptId - 1), so we will decrease it.
- public static long convertTaskAttemptIdToLong(TaskAttemptID taskAttemptID,
int appAttemptId) {
+ public static int createRssTaskAttemptId(
+ TaskAttemptID taskAttemptID, int appAttemptId, int maxAttemptNo) {
+ int attemptBits = ClientUtils.getAttemptIdBits(maxAttemptNo);
+
if (appAttemptId < 1) {
throw new RssException("appAttemptId " + appAttemptId + " is wrong");
}
- long lowBytes = taskAttemptID.getId() - (appAttemptId - 1) * 1000L;
- if (lowBytes > MAX_ATTEMPT_ID || lowBytes < 0) {
+ int attemptId = taskAttemptID.getId() - (appAttemptId - 1) * 1000;
+ if (attemptId > maxAttemptNo || attemptId < 0) {
throw new RssException(
- "TaskAttempt " + taskAttemptID + " low bytes " + lowBytes + " exceed
" + MAX_ATTEMPT_ID);
+ "TaskAttempt " + taskAttemptID + " attemptId " + attemptId + "
exceed " + maxAttemptNo);
}
- long highBytes = taskAttemptID.getTaskID().getId();
- if (highBytes > MAX_TASK_ID || highBytes < 0) {
+ int taskId = taskAttemptID.getTaskID().getId();
+
+ int mapIndexBits = 32 - Integer.numberOfLeadingZeros(taskId);
Review Comment:
This can reuse `ClientUtils.getNumberOfSignificantBits` below:
```suggestion
int mapIndexBits = ClientUtils.getNumberOfSignificantBits(taskId);
```
##########
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java:
##########
@@ -75,7 +77,11 @@ public Roaring64NavigableMap fetchAllRssTaskIds() {
String errMsg = "TaskAttemptIDs are inconsistent with map tasks";
for (TaskAttemptID taskAttemptID : successMaps) {
if (!obsoleteMaps.contains(taskAttemptID)) {
- long rssTaskId = RssMRUtils.convertTaskAttemptIdToLong(taskAttemptID,
appAttemptId);
+ int maxFailures = jobConf.getInt(MRJobConfig.MAP_MAX_ATTEMPTS, 4);
+ boolean speculation = jobConf.getBoolean(MRJobConfig.MAP_SPECULATIVE,
true);
Review Comment:
Default values for `MAP_MAX_ATTEMPTS` and `MAP_SPECULATIVE` defined in at
least two places, here and
client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java.
With #1427, default values will be defined once.
##########
client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java:
##########
@@ -274,23 +252,38 @@ private static int mapVertexId(String vertexName) {
}
}
- public static long convertTaskAttemptIdToLong(TezTaskAttemptID
taskAttemptID) {
- int lowBytes = taskAttemptID.getId();
- if (lowBytes > MAX_ATTEMPT_ID || lowBytes < 0) {
- throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " +
lowBytes + " exceed");
+ public static int createRssTaskAttemptId(TezTaskAttemptID taskAttemptID, int
maxAttemptNo) {
+ int attemptBits = ClientUtils.getAttemptIdBits(maxAttemptNo);
+
+ int attemptId = taskAttemptID.getId();
+ if (attemptId > maxAttemptNo || attemptId < 0) {
+ throw new RssException(
+ "TaskAttempt " + taskAttemptID + " attemptId " + attemptId + "
exceed");
}
- int highBytes = taskAttemptID.getTaskID().getId();
- if (highBytes > MAX_TASK_ID || highBytes < 0) {
+ int taskId = taskAttemptID.getTaskID().getId();
+
+ int mapIndexBits = 32 - Integer.numberOfLeadingZeros(taskId);
+ if (mapIndexBits + attemptBits > LAYOUT.taskAttemptIdBits) {
throw new RssException(
- "TaskAttempt " + taskAttemptID + " high bytes " + highBytes + "
exceed.");
+ "Observing taskId["
+ + taskId
+ + "] that would produce a taskAttemptId with "
+ + (mapIndexBits + attemptBits)
+ + " bits which is larger than the allowed "
+ + LAYOUT.taskAttemptIdBits
+ + "]). Please consider providing more bits for taskAttemptIds.");
}
- long id = (long) highBytes << MAX_ATTEMPT_LENGTH + lowBytes;
- LOG.info("ConvertTaskAttemptIdToLong taskAttemptID:{}, id is {}, .",
taskAttemptID, id);
+
+ int id = (taskId << attemptBits) + attemptId;
Review Comment:
Same here:
```suggestion
int id = (taskId << attemptBits) | attemptId;
```
##########
client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java:
##########
@@ -112,4 +112,21 @@ public static void validateClientType(String clientType) {
String.format("The value of %s should be one of %s", clientType,
types));
}
}
+
+ public static int getMaxAttemptNo(int maxFailures, boolean speculation) {
+ // attempt number is zero based: 0, 1, …, maxFailures-1
+ // max maxFailures < 1 is not allowed but for safety, we interpret that as
maxFailures == 1
+ int maxAttemptNo = maxFailures < 1 ? 0 : maxFailures - 1;
+
+ // with speculative execution enabled we could observe +1 attempts
+ if (speculation) {
+ maxAttemptNo++;
+ }
+
+ return maxAttemptNo;
+ }
+
+ public static int getAttemptIdBits(int maxAttemptNo) {
+ return 32 - Integer.numberOfLeadingZeros(maxAttemptNo);
+ }
Review Comment:
This method could be made more generic (and reusable) as it does not care
about the meaning of the int argument:
```suggestion
public static int getNumberOfSignificantBits(int number) {
return 32 - Integer.numberOfLeadingZeros(number);
}
```
##########
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java:
##########
@@ -44,38 +45,46 @@ public class RssMRUtils {
private static final Logger LOG = LoggerFactory.getLogger(RssMRUtils.class);
private static final BlockIdLayout LAYOUT = BlockIdLayout.DEFAULT;
- private static final int MAX_ATTEMPT_LENGTH = 4;
- private static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
- private static final int MAX_TASK_LENGTH = LAYOUT.taskAttemptIdBits -
MAX_ATTEMPT_LENGTH;
- private static final int MAX_TASK_ID = (1 << MAX_TASK_LENGTH) - 1;
// Class TaskAttemptId have two field id and mapId. MR have a trick logic,
taskAttemptId will
// increase 1000 * (appAttemptId - 1), so we will decrease it.
- public static long convertTaskAttemptIdToLong(TaskAttemptID taskAttemptID,
int appAttemptId) {
+ public static int createRssTaskAttemptId(
+ TaskAttemptID taskAttemptID, int appAttemptId, int maxAttemptNo) {
+ int attemptBits = ClientUtils.getAttemptIdBits(maxAttemptNo);
+
if (appAttemptId < 1) {
throw new RssException("appAttemptId " + appAttemptId + " is wrong");
}
- long lowBytes = taskAttemptID.getId() - (appAttemptId - 1) * 1000L;
- if (lowBytes > MAX_ATTEMPT_ID || lowBytes < 0) {
+ int attemptId = taskAttemptID.getId() - (appAttemptId - 1) * 1000;
+ if (attemptId > maxAttemptNo || attemptId < 0) {
throw new RssException(
- "TaskAttempt " + taskAttemptID + " low bytes " + lowBytes + " exceed
" + MAX_ATTEMPT_ID);
+ "TaskAttempt " + taskAttemptID + " attemptId " + attemptId + "
exceed " + maxAttemptNo);
}
- long highBytes = taskAttemptID.getTaskID().getId();
- if (highBytes > MAX_TASK_ID || highBytes < 0) {
+ int taskId = taskAttemptID.getTaskID().getId();
+
+ int mapIndexBits = 32 - Integer.numberOfLeadingZeros(taskId);
+ if (mapIndexBits + attemptBits > LAYOUT.taskAttemptIdBits) {
throw new RssException(
- "TaskAttempt " + taskAttemptID + " high bytes " + highBytes + "
exceed " + MAX_TASK_ID);
+ "Observing taskId["
+ + taskId
+ + "] that would produce a taskAttemptId with "
+ + (mapIndexBits + attemptBits)
+ + " bits which is larger than the allowed "
+ + LAYOUT.taskAttemptIdBits
+ + "]). Please consider providing more bits for taskAttemptIds.");
}
- long taskAttemptId = (highBytes << (MAX_ATTEMPT_LENGTH)) + lowBytes;
- return LAYOUT.getBlockId(0, 0, taskAttemptId);
+
+ return (taskId << (attemptBits)) + attemptId;
Review Comment:
Brackets are redundant:
```suggestion
return (taskId << attemptBits) + attemptId;
```
Given we bit-shoft `taskId` , operator `|` would be more appropriate:
```suggestion
return (taskId << attemptBits) | attemptId;
```
##########
client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java:
##########
@@ -75,7 +77,11 @@ public Roaring64NavigableMap fetchAllRssTaskIds() {
String errMsg = "TaskAttemptIDs are inconsistent with map tasks";
for (TaskAttemptID taskAttemptID : successMaps) {
if (!obsoleteMaps.contains(taskAttemptID)) {
- long rssTaskId = RssMRUtils.convertTaskAttemptIdToLong(taskAttemptID,
appAttemptId);
+ int maxFailures = jobConf.getInt(MRJobConfig.MAP_MAX_ATTEMPTS, 4);
+ boolean speculation = jobConf.getBoolean(MRJobConfig.MAP_SPECULATIVE,
true);
+ int maxAttemptNo = ClientUtils.getMaxAttemptNo(maxFailures,
speculation);
+ int rssTaskId =
+ RssMRUtils.createRssTaskAttemptId(taskAttemptID, appAttemptId,
maxAttemptNo);
Review Comment:
I'd suggest to move some of this logic into `RssMRUtils`:
```suggestion
int rssTaskId =
RssMRUtils.createRssTaskAttemptId(taskAttemptID, appAttemptId,
maxFailures, speculation);
```
As a convenience function, you could also provide (but keep above method for
testing):
```suggestion
int rssTaskId =
RssMRUtils.createRssTaskAttemptId(taskAttemptID, appAttemptId,
jobConf);
```
##########
client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java:
##########
@@ -215,6 +216,16 @@ public List<Event> close() throws Exception {
public void start() throws Exception {
if (!isStarted.get()) {
memoryUpdateCallbackHandler.validateUpdateReceived();
+ int maxFailures =
+ conf.getInt(
+ TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
+ TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
+ boolean speculation =
+ conf.getBoolean(
+ TezConfiguration.TEZ_AM_SPECULATION_ENABLED,
+ TezConfiguration.TEZ_AM_SPECULATION_ENABLED_DEFAULT);
+ int maxAttemptNo = ClientUtils.getMaxAttemptNo(maxFailures, speculation);
+ int rsstaskAttemptId = RssTezUtils.createRssTaskAttemptId(taskAttemptId,
maxAttemptNo);
Review Comment:
```suggestion
int rssTaskAttemptId =
RssTezUtils.createRssTaskAttemptId(taskAttemptId, maxAttemptNo);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]