This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 1258407c6 [MINOR] refactor(common): Move blockId bit logic into common
class (#1527)
1258407c6 is described below
commit 1258407c672bb26f00f279b732072e6fd5abf51e
Author: Enrico Minack <[email protected]>
AuthorDate: Fri Feb 23 09:57:04 2024 +0100
[MINOR] refactor(common): Move blockId bit logic into common class (#1527)
### What changes were proposed in this pull request?
Moves block id bit manipulation logic into one place in `common.util`.
Logic is tested once and reused everywhere. Reused in `RssTezUtils` and
`RssMRUtils` where reasonable. Aligns the order of arguments that constructs a
`BlockId` with the bit-wise order inside the block id (highest to lowest).
### Why are the changes needed?
Reduces code duplication and improves code readability.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests.
---
.../org/apache/hadoop/mapreduce/RssMRUtils.java | 63 ++++--------
.../apache/hadoop/mapreduce/RssMRUtilsTest.java | 2 +-
.../spark/shuffle/writer/WriteBufferManager.java | 5 +-
.../shuffle/reader/AbstractRssReaderTest.java | 4 +-
.../shuffle/reader/RssShuffleDataIteratorTest.java | 4 +-
.../java/org/apache/tez/common/RssTezUtils.java | 52 +++-------
.../common/sort/buffer/WriteBufferManager.java | 5 +-
.../org/apache/tez/common/RssTezUtilsTest.java | 2 +-
.../uniffle/client/impl/ShuffleReadClientImpl.java | 9 +-
.../apache/uniffle/client/util/ClientUtils.java | 33 -------
.../uniffle/client/util/DefaultIdHelper.java | 4 +-
.../org/apache/uniffle/client/ClientUtilsTest.java | 32 +-----
.../client/impl/ShuffleReadClientImplTest.java | 21 ++--
.../org/apache/uniffle/common/BufferSegment.java | 7 +-
.../apache/uniffle/common/ShuffleBlockInfo.java | 3 +-
.../uniffle/common/ShufflePartitionedBlock.java | 8 +-
.../common/segment/FixedSizeSegmentSplitter.java | 5 +-
.../common/segment/LocalOrderSegmentSplitter.java | 5 +-
.../org/apache/uniffle/common/util/BlockId.java | 108 +++++++++++++++++++++
.../org/apache/uniffle/common/util/Constants.java | 11 ++-
.../org/apache/uniffle/common/util/RssUtils.java | 3 +-
.../apache/uniffle/common/BufferSegmentTest.java | 2 +-
.../uniffle/common/ShuffleBlockInfoTest.java | 14 +--
.../common/ShufflePartitionedBlockTest.java | 7 +-
.../apache/uniffle/common/util/BlockIdTest.java | 66 +++++++++++++
.../apache/uniffle/common/util/RssUtilsTest.java | 17 ++--
.../apache/uniffle/test/ShuffleReadWriteBase.java | 12 +--
.../apache/uniffle/test/ShuffleServerGrpcTest.java | 16 +--
.../uniffle/test/ShuffleWithRssClientTest.java | 8 +-
.../uniffle/test/SparkClientWithLocalTest.java | 8 +-
.../apache/uniffle/server/ShuffleTaskManager.java | 5 +-
.../uniffle/server/ShuffleTaskManagerTest.java | 22 ++---
.../storage/common/FileBasedShuffleSegment.java | 8 +-
.../storage/HadoopShuffleHandlerTestBase.java | 16 +--
.../handler/impl/HadoopShuffleReadHandlerTest.java | 7 +-
.../handler/impl/LocalFileHandlerTestBase.java | 7 +-
36 files changed, 331 insertions(+), 270 deletions(-)
diff --git
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index 5be31d305..e92bf0702 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -37,32 +37,34 @@ import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.Constants;
public class RssMRUtils {
private static final Logger LOG = LoggerFactory.getLogger(RssMRUtils.class);
private static final int MAX_ATTEMPT_LENGTH = 6;
- private static final long MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
+ private static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
+ private static final int MAX_SEQUENCE_NO =
+ (1 << (Constants.ATOMIC_INT_MAX_LENGTH - MAX_ATTEMPT_LENGTH)) - 1;
// Class TaskAttemptId have two field id and mapId, rss taskAttemptID have
21 bits,
// mapId is 19 bits, id is 2 bits. MR have a trick logic, taskAttemptId will
increase
// 1000 * (appAttemptId - 1), so we will decrease it.
public static long convertTaskAttemptIdToLong(TaskAttemptID taskAttemptID,
int appAttemptId) {
- long lowBytes = taskAttemptID.getTaskID().getId();
+ int lowBytes = taskAttemptID.getTaskID().getId();
if (lowBytes > Constants.MAX_TASK_ATTEMPT_ID) {
throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " +
lowBytes + " exceed");
}
if (appAttemptId < 1) {
throw new RssException("appAttemptId " + appAttemptId + " is wrong");
}
- long highBytes = (long) taskAttemptID.getId() - (appAttemptId - 1) * 1000;
+ int highBytes = taskAttemptID.getId() - (appAttemptId - 1) * 1000;
if (highBytes > MAX_ATTEMPT_ID || highBytes < 0) {
throw new RssException(
"TaskAttempt " + taskAttemptID + " high bytes " + highBytes + "
exceed");
}
- return (highBytes << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH +
Constants.PARTITION_ID_MAX_LENGTH))
- + lowBytes;
+ return BlockId.getBlockId(highBytes, 0, lowBytes);
}
public static TaskAttemptID createMRTaskAttemptId(
@@ -70,14 +72,9 @@ public class RssMRUtils {
if (appAttemptId < 1) {
throw new RssException("appAttemptId " + appAttemptId + " is wrong");
}
- TaskID taskID =
- new TaskID(jobID, taskType, (int) (rssTaskAttemptId &
Constants.MAX_TASK_ATTEMPT_ID));
- return new TaskAttemptID(
- taskID,
- (int)
- (rssTaskAttemptId
- >> (Constants.TASK_ATTEMPT_ID_MAX_LENGTH +
Constants.PARTITION_ID_MAX_LENGTH))
- + 1000 * (appAttemptId - 1));
+ TaskID taskID = new TaskID(jobID, taskType,
BlockId.getTaskAttemptId(rssTaskAttemptId));
+ int id = BlockId.getSequenceNo(rssTaskAttemptId) + 1000 * (appAttemptId -
1);
+ return new TaskAttemptID(taskID, id);
}
public static ShuffleWriteClient createShuffleClient(JobConf jobConf) {
@@ -229,51 +226,31 @@ public class RssMRUtils {
return rssJobConf.get(key, defaultValue);
}
- public static long getBlockId(long partitionId, long taskAttemptId, int
nextSeqNo) {
+ public static long getBlockId(int partitionId, long taskAttemptId, int
nextSeqNo) {
long attemptId =
taskAttemptId >> (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH);
if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
throw new RssException(
"Can't support attemptId [" + attemptId + "], the max value should
be " + MAX_ATTEMPT_ID);
}
- long atomicInt = (nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId;
- if (atomicInt < 0 || atomicInt > Constants.MAX_SEQUENCE_NO) {
+ if (nextSeqNo < 0 || nextSeqNo > MAX_SEQUENCE_NO) {
throw new RssException(
- "Can't support sequence ["
- + atomicInt
- + "], the max value should be "
- + Constants.MAX_SEQUENCE_NO);
- }
- if (partitionId < 0 || partitionId > Constants.MAX_PARTITION_ID) {
- throw new RssException(
- "Can't support partitionId["
- + partitionId
- + "], the max value should be "
- + Constants.MAX_PARTITION_ID);
+ "Can't support sequence [" + nextSeqNo + "], the max value should be
" + MAX_SEQUENCE_NO);
}
+
+ int atomicInt = (int) ((nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId);
long taskId =
taskAttemptId
- (attemptId
<< (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
- if (taskId < 0 || taskId > Constants.MAX_TASK_ATTEMPT_ID) {
- throw new RssException(
- "Can't support taskId["
- + taskId
- + "], the max value should be "
- + Constants.MAX_TASK_ATTEMPT_ID);
- }
- return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
- + taskId;
+
+ return BlockId.getBlockId(atomicInt, partitionId, taskId);
}
public static long getTaskAttemptId(long blockId) {
- long mapId = blockId & Constants.MAX_TASK_ATTEMPT_ID;
- long attemptId =
- (blockId >> (Constants.TASK_ATTEMPT_ID_MAX_LENGTH +
Constants.PARTITION_ID_MAX_LENGTH))
- & MAX_ATTEMPT_ID;
- return (attemptId << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH +
Constants.PARTITION_ID_MAX_LENGTH))
- + mapId;
+ int mapId = BlockId.getTaskAttemptId(blockId);
+ int attemptId = BlockId.getSequenceNo(blockId) & MAX_ATTEMPT_ID;
+ return BlockId.getBlockId(attemptId, 0, mapId);
}
public static int estimateTaskConcurrency(JobConf jobConf) {
diff --git
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
index cb5c2c656..c95951eef 100644
---
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
+++
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java
@@ -87,7 +87,7 @@ public class RssMRUtilsTest {
long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
for (int partitionId = 0; partitionId <= 3000; partitionId++) {
for (int seqNo = 0; seqNo <= 10; seqNo++) {
- long blockId = RssMRUtils.getBlockId(Long.valueOf(partitionId),
taskAttemptId, seqNo);
+ long blockId = RssMRUtils.getBlockId(partitionId, taskAttemptId,
seqNo);
int newPartitionId =
Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
& mask);
assertEquals(partitionId, newPartitionId);
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 5b9a92841..9450c0f4e 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -46,12 +46,12 @@ import org.apache.spark.shuffle.RssSparkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
public class WriteBufferManager extends MemoryConsumer {
@@ -325,8 +325,7 @@ public class WriteBufferManager extends MemoryConsumer {
compressTime += System.currentTimeMillis() - start;
}
final long crc32 = ChecksumUtils.getCrc32(compressed);
- final long blockId =
- ClientUtils.getBlockId(partitionId, taskAttemptId,
getNextSeqNo(partitionId));
+ final long blockId = BlockId.getBlockId(getNextSeqNo(partitionId),
partitionId, taskAttemptId);
uncompressedDataLen += data.length;
shuffleWriteMetrics.incBytesWritten(compressed.length);
// add memory to indicate bytes which will be sent to shuffle server
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
index 072454760..90505d965 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/AbstractRssReaderTest.java
@@ -34,10 +34,10 @@ import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
-import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.storage.HadoopTestBase;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
@@ -106,7 +106,7 @@ public abstract class AbstractRssReaderTest extends
HadoopTestBase {
expectedData.put(key, value);
writeData(serializeStream, key, value);
}
- long blockId = ClientUtils.getBlockId(partitionID, 0,
atomicInteger.getAndIncrement());
+ long blockId = BlockId.getBlockId(atomicInteger.getAndIncrement(),
partitionID, 0);
blockIdBitmap.add(blockId);
blocks.add(createShuffleBlock(output.toBytes(), blockId, compress));
serializeStream.close();
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
index 0dea95147..2962bc141 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/reader/RssShuffleDataIteratorTest.java
@@ -40,9 +40,9 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
-import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler;
@@ -81,7 +81,7 @@ public class RssShuffleDataIteratorTest extends
AbstractRssReaderTest {
validateResult(rssShuffleDataIterator, expectedData, 10);
- blockIdBitmap.add(ClientUtils.getBlockId(0, 0, Constants.MAX_SEQUENCE_NO));
+ blockIdBitmap.add(BlockId.getBlockId(Constants.MAX_SEQUENCE_NO, 0, 0));
rssShuffleDataIterator =
getDataIterator(basePath, blockIdBitmap, taskIdBitmap,
Lists.newArrayList(ssi1));
int recNum = 0;
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
index e497995f1..c1e8643c5 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
@@ -53,6 +53,7 @@ import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.Constants;
public class RssTezUtils {
@@ -60,7 +61,9 @@ public class RssTezUtils {
private static final Logger LOG = LoggerFactory.getLogger(RssTezUtils.class);
private static final int MAX_ATTEMPT_LENGTH = 6;
- private static final long MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
+ private static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
+ private static final int MAX_SEQUENCE_NO =
+ (1 << (Constants.ATOMIC_INT_MAX_LENGTH - MAX_ATTEMPT_LENGTH)) - 1;
public static final String HOST_NAME = "hostname";
@@ -155,7 +158,7 @@ public class RssTezUtils {
return StringUtils.join(ids, "_", 0, 7);
}
- public static long getBlockId(long partitionId, long taskAttemptId, int
nextSeqNo) {
+ public static long getBlockId(int partitionId, long taskAttemptId, int
nextSeqNo) {
LOG.info(
"GetBlockId, partitionId:{}, taskAttemptId:{}, nextSeqNo:{}",
partitionId,
@@ -167,45 +170,24 @@ public class RssTezUtils {
throw new RssException(
"Can't support attemptId [" + attemptId + "], the max value should
be " + MAX_ATTEMPT_ID);
}
- long atomicInt = (nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId;
- if (atomicInt < 0 || atomicInt > Constants.MAX_SEQUENCE_NO) {
+ if (nextSeqNo < 0 || nextSeqNo > MAX_SEQUENCE_NO) {
throw new RssException(
- "Can't support sequence ["
- + atomicInt
- + "], the max value should be "
- + Constants.MAX_SEQUENCE_NO);
- }
- if (partitionId < 0 || partitionId > Constants.MAX_PARTITION_ID) {
- throw new RssException(
- "Can't support partitionId["
- + partitionId
- + "], the max value should be "
- + Constants.MAX_PARTITION_ID);
+ "Can't support sequence [" + nextSeqNo + "], the max value should be
" + MAX_SEQUENCE_NO);
}
+
+ int atomicInt = (int) ((nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId);
long taskId =
taskAttemptId
- (attemptId
<< (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
- if (taskId < 0 || taskId > Constants.MAX_TASK_ATTEMPT_ID) {
- throw new RssException(
- "Can't support taskId["
- + taskId
- + "], the max value should be "
- + Constants.MAX_TASK_ATTEMPT_ID);
- }
- return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
- + taskId;
+ return BlockId.getBlockId(atomicInt, partitionId, taskId);
}
public static long getTaskAttemptId(long blockId) {
- long mapId = blockId & Constants.MAX_TASK_ATTEMPT_ID;
- long attemptId =
- (blockId >> (Constants.TASK_ATTEMPT_ID_MAX_LENGTH +
Constants.PARTITION_ID_MAX_LENGTH))
- & MAX_ATTEMPT_ID;
- return (attemptId << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH +
Constants.PARTITION_ID_MAX_LENGTH))
- + mapId;
+ int mapId = BlockId.getTaskAttemptId(blockId);
+ int attemptId = BlockId.getSequenceNo(blockId) & MAX_ATTEMPT_ID;
+ return BlockId.getBlockId(attemptId, 0, mapId);
}
public static int estimateTaskConcurrency(Configuration jobConf, int mapNum,
int reduceNum) {
@@ -298,18 +280,16 @@ public class RssTezUtils {
}
public static long convertTaskAttemptIdToLong(TezTaskAttemptID
taskAttemptID) {
- long lowBytes = taskAttemptID.getTaskID().getId();
+ int lowBytes = taskAttemptID.getTaskID().getId();
if (lowBytes > Constants.MAX_TASK_ATTEMPT_ID) {
throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " +
lowBytes + " exceed");
}
- long highBytes = taskAttemptID.getId();
+ int highBytes = taskAttemptID.getId();
if (highBytes > MAX_ATTEMPT_ID || highBytes < 0) {
throw new RssException(
"TaskAttempt " + taskAttemptID + " high bytes " + highBytes + "
exceed.");
}
- long id =
- (highBytes << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH +
Constants.PARTITION_ID_MAX_LENGTH))
- + lowBytes;
+ long id = BlockId.getBlockId(highBytes, 0, lowBytes);
LOG.info("ConvertTaskAttemptIdToLong taskAttemptID:{}, id is {}, .",
taskAttemptID, id);
return id;
}
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
index ec83a099f..389245802 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java
@@ -53,6 +53,7 @@ import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.ThreadUtils;
@@ -371,8 +372,8 @@ public class WriteBufferManager<K, V> {
final long crc32 = ChecksumUtils.getCrc32(compressed);
compressTime += System.currentTimeMillis() - start;
final long blockId =
- RssTezUtils.getBlockId((long) partitionId, taskAttemptId,
getNextSeqNo(partitionId));
- LOG.info("blockId is {}", blockId);
+ RssTezUtils.getBlockId(partitionId, taskAttemptId,
getNextSeqNo(partitionId));
+ LOG.info("blockId is {}", BlockId.fromLong(blockId));
uncompressedDataLen += data.length;
// add memory to indicate bytes which will be sent to shuffle server
inSendListBytes.addAndGet(wb.getDataLength());
diff --git
a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
index c21173d15..639521ff7 100644
--- a/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
+++ b/client-tez/src/test/java/org/apache/tez/common/RssTezUtilsTest.java
@@ -99,7 +99,7 @@ public class RssTezUtilsTest {
long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
for (int partitionId = 0; partitionId <= 3000; partitionId++) {
for (int seqNo = 0; seqNo <= 10; seqNo++) {
- long blockId = RssTezUtils.getBlockId(Long.valueOf(partitionId),
taskAttemptId, seqNo);
+ long blockId = RssTezUtils.getBlockId(partitionId, taskAttemptId,
seqNo);
int newPartitionId =
Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
& mask);
assertEquals(partitionId, newPartitionId);
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 7e2f930a7..e1fbb9fec 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -41,6 +41,7 @@ import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssFetchFailedException;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.IdHelper;
import org.apache.uniffle.common.util.RssUtils;
@@ -211,14 +212,14 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
actualCrc = ChecksumUtils.getCrc32(readBuffer, bs.getOffset(),
bs.getLength());
crcCheckTime.addAndGet(System.currentTimeMillis() - start);
} catch (Exception e) {
- LOG.warn("Can't read data for blockId[" + bs.getBlockId() + "]",
e);
+ LOG.warn("Can't read data for " +
BlockId.toString(bs.getBlockId()), e);
}
if (expectedCrc != actualCrc) {
String errMsg =
- "Unexpected crc value for blockId["
- + bs.getBlockId()
- + "], expected:"
+ "Unexpected crc value for "
+ + BlockId.toString(bs.getBlockId())
+ + ", expected:"
+ expectedCrc
+ ", actual:"
+ actualCrc;
diff --git
a/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
b/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
index b3330c85a..b3d40dcde 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
@@ -29,43 +29,10 @@ import java.util.stream.Collectors;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.RemoteStorageInfo;
-import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.util.StorageType;
public class ClientUtils {
- // BlockId is positive long (63 bits) composed of partitionId, taskAttemptId
and AtomicInteger.
- // AtomicInteger is highest 18 bits, max value is 2^18 - 1
- // partitionId is middle 24 bits, max value is 2^24 - 1
- // taskAttemptId is lowest 21 bits, max value is 2^21 - 1
- // Values of partitionId, taskAttemptId and AtomicInteger are always
positive.
- public static Long getBlockId(long partitionId, long taskAttemptId, long
atomicInt) {
- if (atomicInt < 0 || atomicInt > Constants.MAX_SEQUENCE_NO) {
- throw new IllegalArgumentException(
- "Can't support sequence["
- + atomicInt
- + "], the max value should be "
- + Constants.MAX_SEQUENCE_NO);
- }
- if (partitionId < 0 || partitionId > Constants.MAX_PARTITION_ID) {
- throw new IllegalArgumentException(
- "Can't support partitionId["
- + partitionId
- + "], the max value should be "
- + Constants.MAX_PARTITION_ID);
- }
- if (taskAttemptId < 0 || taskAttemptId > Constants.MAX_TASK_ATTEMPT_ID) {
- throw new IllegalArgumentException(
- "Can't support taskAttemptId["
- + taskAttemptId
- + "], the max value should be "
- + Constants.MAX_TASK_ATTEMPT_ID);
- }
- return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
- + taskAttemptId;
- }
-
public static RemoteStorageInfo fetchRemoteStorage(
String appId,
RemoteStorageInfo defaultRemoteStorage,
diff --git
a/client/src/main/java/org/apache/uniffle/client/util/DefaultIdHelper.java
b/client/src/main/java/org/apache/uniffle/client/util/DefaultIdHelper.java
index 97376cc83..7161c4fee 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/DefaultIdHelper.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/DefaultIdHelper.java
@@ -17,12 +17,12 @@
package org.apache.uniffle.client.util;
-import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.IdHelper;
public class DefaultIdHelper implements IdHelper {
@Override
public long getTaskAttemptId(long blockId) {
- return blockId & Constants.MAX_TASK_ATTEMPT_ID;
+ return BlockId.getTaskAttemptId(blockId);
}
}
diff --git
a/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
b/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
index b8e7e7f5b..19ff6ac13 100644
--- a/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/ClientUtilsTest.java
@@ -33,12 +33,11 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.client.util.DefaultIdHelper;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.RssUtils;
import static org.apache.uniffle.client.util.ClientUtils.waitUntilDoneOrFail;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class ClientUtilsTest {
@@ -46,33 +45,6 @@ public class ClientUtilsTest {
private ExecutorService executorService = Executors.newFixedThreadPool(10);
- @Test
- public void getBlockIdTest() {
- // max value of blockId
- assertEquals(new Long(854558029292503039L),
ClientUtils.getBlockId(16777215, 1048575, 24287));
- // just a random test
- assertEquals(new Long(3518437418598500L), ClientUtils.getBlockId(100, 100,
100));
- // min value of blockId
- assertEquals(new Long(0L), ClientUtils.getBlockId(0, 0, 0));
-
- final Throwable e1 =
- assertThrows(IllegalArgumentException.class, () ->
ClientUtils.getBlockId(16777216, 0, 0));
- assertTrue(
- e1.getMessage()
- .contains("Can't support partitionId[16777216], the max value
should be 16777215"));
-
- final Throwable e2 =
- assertThrows(IllegalArgumentException.class, () ->
ClientUtils.getBlockId(0, 2097152, 0));
- assertTrue(
- e2.getMessage()
- .contains("Can't support taskAttemptId[2097152], the max value
should be 2097151"));
-
- final Throwable e3 =
- assertThrows(IllegalArgumentException.class, () ->
ClientUtils.getBlockId(0, 0, 262144));
- assertTrue(
- e3.getMessage().contains("Can't support sequence[262144], the max
value should be 262143"));
- }
-
@Test
public void testGenerateTaskIdBitMap() {
int partitionId = 1;
@@ -82,7 +54,7 @@ public class ClientUtilsTest {
for (int i = 0; i < taskSize; i++) {
except[i] = i;
for (int j = 0; j < 100; j++) {
- Long blockId = ClientUtils.getBlockId(partitionId, i, j);
+ long blockId = BlockId.getBlockId(j, partitionId, i);
blockIdMap.addLong(blockId);
}
}
diff --git
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
index 14e9af91a..eda8136d7 100644
---
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
+++
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -38,6 +38,7 @@ import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.HadoopTestBase;
@@ -53,7 +54,7 @@ import static org.mockito.ArgumentMatchers.any;
public class ShuffleReadClientImplTest extends HadoopTestBase {
private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should
be thrown";
- private static AtomicLong ATOMIC_LONG = new AtomicLong(0);
+ private static AtomicInteger ATOMIC_INT = new AtomicInteger(0);
private ShuffleServerInfo ssi1 = new ShuffleServerInfo("host1-0", "host1",
0);
private ShuffleServerInfo ssi2 = new ShuffleServerInfo("host2-0", "host2",
0);
@@ -91,7 +92,7 @@ public class ShuffleReadClientImplTest extends HadoopTestBase
{
readClient.checkProcessedBlockIds();
readClient.close();
- blockIdBitmap.addLong(Constants.MAX_TASK_ATTEMPT_ID - 1);
+ blockIdBitmap.addLong(BlockId.getBlockId(0, 0,
Constants.MAX_TASK_ATTEMPT_ID - 1));
taskIdBitmap.addLong(Constants.MAX_TASK_ATTEMPT_ID - 1);
readClient =
baseReadBuilder()
@@ -377,7 +378,9 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
Roaring64NavigableMap wrongBlockIdBitmap =
Roaring64NavigableMap.bitmapOf();
LongIterator iter = blockIdBitmap.getLongIterator();
while (iter.hasNext()) {
- wrongBlockIdBitmap.addLong(iter.next() + (1 <<
Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+ BlockId blockId = BlockId.fromLong(iter.next());
+ wrongBlockIdBitmap.addLong(
+ BlockId.getBlockId(blockId.sequenceNo, blockId.partitionId + 1,
blockId.taskAttemptId));
}
ShuffleReadClientImpl readClient =
@@ -585,10 +588,7 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
for (int i = 0; i < num; i++) {
byte[] buf = new byte[length];
new Random().nextBytes(buf);
- long blockId =
- (ATOMIC_LONG.getAndIncrement()
- << (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + taskAttemptId;
+ long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), 0,
taskAttemptId);
blocks.add(
new ShufflePartitionedBlock(
length, length, ChecksumUtils.getCrc32(buf), blockId,
taskAttemptId, buf));
@@ -610,10 +610,7 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
for (int i = 0; i < num; i++) {
byte[] buf = new byte[length];
new Random().nextBytes(buf);
- long blockId =
- (ATOMIC_LONG.getAndIncrement()
- << (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + taskAttemptId;
+ long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), 0,
taskAttemptId);
ShufflePartitionedBlock spb =
new ShufflePartitionedBlock(
length, length, ChecksumUtils.getCrc32(buf), blockId,
taskAttemptId, buf);
diff --git a/common/src/main/java/org/apache/uniffle/common/BufferSegment.java
b/common/src/main/java/org/apache/uniffle/common/BufferSegment.java
index 1ab68c512..ac7faa4f2 100644
--- a/common/src/main/java/org/apache/uniffle/common/BufferSegment.java
+++ b/common/src/main/java/org/apache/uniffle/common/BufferSegment.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.common;
import java.util.Objects;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.BlockId;
public class BufferSegment {
@@ -60,9 +61,9 @@ public class BufferSegment {
@Override
public String toString() {
- return "BufferSegment{blockId["
- + blockId
- + "], taskAttemptId["
+ return "BufferSegment{"
+ + BlockId.toString(blockId)
+ + ", taskAttemptId["
+ taskAttemptId
+ "], offset["
+ offset
diff --git
a/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java
b/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java
index 8de75d90d..8b5fdc6d3 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleBlockInfo.java
@@ -22,6 +22,7 @@ import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ByteBufUtils;
public class ShuffleBlockInfo {
@@ -136,7 +137,7 @@ public class ShuffleBlockInfo {
sb.append("ShuffleBlockInfo:");
sb.append("shuffleId[" + shuffleId + "],");
sb.append("partitionId[" + partitionId + "],");
- sb.append("blockId[" + blockId + "],");
+ sb.append(BlockId.toString(blockId) + ",");
sb.append("length[" + length + "],");
sb.append("uncompressLength[" + uncompressLength + "],");
sb.append("crc[" + crc + "],");
diff --git
a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
index 1ce68b6b6..4772b5087 100644
---
a/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
+++
b/common/src/main/java/org/apache/uniffle/common/ShufflePartitionedBlock.java
@@ -22,6 +22,8 @@ import java.util.Objects;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import org.apache.uniffle.common.util.BlockId;
+
public class ShufflePartitionedBlock {
private int length;
@@ -123,9 +125,9 @@ public class ShufflePartitionedBlock {
@Override
public String toString() {
- return "ShufflePartitionedBlock{blockId["
- + blockId
- + "], length["
+ return "ShufflePartitionedBlock{"
+ + BlockId.toString(blockId)
+ + ", length["
+ length
+ "], uncompressLength["
+ uncompressLength
diff --git
a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
index 7b3626d40..898d0b195 100644
---
a/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
+++
b/common/src/main/java/org/apache/uniffle/common/segment/FixedSizeSegmentSplitter.java
@@ -29,7 +29,7 @@ import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.BlockId;
public class FixedSizeSegmentSplitter implements SegmentSplitter {
private static final Logger LOGGER =
LoggerFactory.getLogger(FixedSizeSegmentSplitter.class);
@@ -82,14 +82,13 @@ public class FixedSizeSegmentSplitter implements
SegmentSplitter {
// than the length in the actual data file, and it needs to be
returned at this time to
// avoid EOFException
if (dataFileLen != -1 && totalLength > dataFileLen) {
- long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
LOGGER.info(
"Abort inconsistent data, the data length: {}(bytes) recorded in
index file is greater than "
+ "the real data file length: {}(bytes). Partition id: {}. "
+ "This may happen when the data is flushing, please
ignore.",
totalLength,
dataFileLen,
- Math.toIntExact((blockId >>
Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask));
+ BlockId.getPartitionId(blockId));
break;
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
index 98b9cca70..cbf09793f 100644
---
a/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
+++
b/common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
@@ -31,7 +31,7 @@ import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.BlockId;
/**
* {@class LocalOrderSegmentSplitter} will be initialized only when the {@class
@@ -104,14 +104,13 @@ public class LocalOrderSegmentSplitter implements
SegmentSplitter {
// than the length in the actual data file, and it needs to be
returned at this time to
// avoid EOFException
if (dataFileLen != -1 && totalLen > dataFileLen) {
- long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
LOGGER.info(
"Abort inconsistent data, the data length: {}(bytes) recorded in
index file is greater than "
+ "the real data file length: {}(bytes). Partition id: {}.
This should not happen. "
+ "This may happen when the data is flushing, please
ignore.",
totalLen,
dataFileLen,
- Math.toIntExact((blockId >>
Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask));
+ BlockId.getPartitionId(blockId));
break;
}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/BlockId.java
b/common/src/main/java/org/apache/uniffle/common/util/BlockId.java
new file mode 100644
index 000000000..7aecaae41
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/util/BlockId.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+// BlockId is positive long (63 bits) composed of partitionId, taskAttemptId
and AtomicInteger.
+// AtomicInteger is highest 18 bits, max value is 2^18 - 1
+// partitionId is middle 24 bits, max value is 2^24 - 1
+// taskAttemptId is lowest 21 bits, max value is 2^21 - 1
+// Values of partitionId, taskAttemptId and AtomicInteger are always positive.
+public class BlockId {
+ public final long blockId;
+ public final int sequenceNo;
+ public final int partitionId;
+ public final int taskAttemptId;
+
+ private BlockId(long blockId, int sequenceNo, int partitionId, int
taskAttemptId) {
+ this.blockId = blockId;
+ this.sequenceNo = sequenceNo;
+ this.partitionId = partitionId;
+ this.taskAttemptId = taskAttemptId;
+ }
+
+ @Override
+ public String toString() {
+ return "blockId["
+ + Long.toHexString(blockId)
+ + " (seq: "
+ + sequenceNo
+ + ", part: "
+ + partitionId
+ + ", task: "
+ + taskAttemptId
+ + ")]";
+ }
+
+ public static String toString(long blockId) {
+ return BlockId.fromLong(blockId).toString();
+ }
+
+ public static BlockId fromLong(long blockId) {
+ int sequenceNo = getSequenceNo(blockId);
+ int partitionId = getPartitionId(blockId);
+ int taskAttemptId = getTaskAttemptId(blockId);
+ return new BlockId(blockId, sequenceNo, partitionId, taskAttemptId);
+ }
+
+ public static BlockId fromIds(int sequenceNo, int partitionId, int
taskAttemptId) {
+ long blockId = getBlockId(sequenceNo, partitionId, taskAttemptId);
+ return new BlockId(blockId, sequenceNo, partitionId, taskAttemptId);
+ }
+
+ public static long getBlockId(int sequenceNo, int partitionId, long
taskAttemptId) {
+ if (sequenceNo < 0 || sequenceNo > Constants.MAX_SEQUENCE_NO) {
+ throw new IllegalArgumentException(
+ "Can't support sequence["
+ + sequenceNo
+ + "], the max value should be "
+ + Constants.MAX_SEQUENCE_NO);
+ }
+ if (partitionId < 0 || partitionId > Constants.MAX_PARTITION_ID) {
+ throw new IllegalArgumentException(
+ "Can't support partitionId["
+ + partitionId
+ + "], the max value should be "
+ + Constants.MAX_PARTITION_ID);
+ }
+ if (taskAttemptId < 0 || taskAttemptId > Constants.MAX_TASK_ATTEMPT_ID) {
+ throw new IllegalArgumentException(
+ "Can't support taskAttemptId["
+ + taskAttemptId
+ + "], the max value should be "
+ + Constants.MAX_TASK_ATTEMPT_ID);
+ }
+
+ return ((long) sequenceNo
+ << (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
+ + ((long) partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
+ + taskAttemptId;
+ }
+
+ public static int getSequenceNo(long blockId) {
+ return (int)
+ (blockId >> (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+ }
+
+ public static int getPartitionId(long blockId) {
+ return (int) ((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) &
Constants.MAX_PARTITION_ID);
+ }
+
+ public static int getTaskAttemptId(long blockId) {
+ return (int) (blockId & Constants.MAX_TASK_ATTEMPT_ID);
+ }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index 4b35463cc..a361eabcd 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -29,14 +29,17 @@ public final class Constants {
public static final String SHUFFLE_INDEX_FILE_SUFFIX = ".index";
// BlockId is long and consist of partitionId, taskAttemptId, atomicInt
// the length of them are ATOMIC_INT_MAX_LENGTH + PARTITION_ID_MAX_LENGTH +
- // TASK_ATTEMPT_ID_MAX_LENGTH = 63
+ // TASK_ATTEMPT_ID_MAX_LENGTH = 63, each of these lengths must be less than
32
+ // do not access the fields, no need to implement your own bit manipulation
logic,
+ // better use methods provided by org.apache.uniffle.common.util.BlockId
public static final int PARTITION_ID_MAX_LENGTH = 24;
public static final int TASK_ATTEMPT_ID_MAX_LENGTH = 21;
public static final int ATOMIC_INT_MAX_LENGTH = 18;
- public static final long MAX_SEQUENCE_NO = (1 <<
Constants.ATOMIC_INT_MAX_LENGTH) - 1;
- public static final long MAX_PARTITION_ID = (1 <<
Constants.PARTITION_ID_MAX_LENGTH) - 1;
- public static final long MAX_TASK_ATTEMPT_ID = (1 <<
Constants.TASK_ATTEMPT_ID_MAX_LENGTH) - 1;
+ public static final int MAX_SEQUENCE_NO = (1 <<
Constants.ATOMIC_INT_MAX_LENGTH) - 1;
+ public static final int MAX_PARTITION_ID = (1 <<
Constants.PARTITION_ID_MAX_LENGTH) - 1;
+ public static final int MAX_TASK_ATTEMPT_ID = (1 <<
Constants.TASK_ATTEMPT_ID_MAX_LENGTH) - 1;
public static final long INVALID_BLOCK_ID = -1L;
+
public static final String KEY_SPLIT_CHAR = "/";
public static final String COMMA_SPLIT_CHAR = ",";
public static final String EQUAL_SPLIT_CHAR = "=";
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index ad2b4c4fb..6d4e2308f 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -335,10 +335,9 @@ public class RssUtils {
result.computeIfAbsent(partitionId, key ->
Roaring64NavigableMap.bitmapOf());
}
Iterator<Long> it = shuffleBitmap.iterator();
- long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
while (it.hasNext()) {
Long blockId = it.next();
- int partitionId = Math.toIntExact((blockId >>
Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
+ int partitionId = BlockId.getPartitionId(blockId);
if (partitionId >= startPartition && partitionId < endPartition) {
result.get(partitionId).add(blockId);
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/BufferSegmentTest.java
b/common/src/test/java/org/apache/uniffle/common/BufferSegmentTest.java
index ca1048521..49a1c58c2 100644
--- a/common/src/test/java/org/apache/uniffle/common/BufferSegmentTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/BufferSegmentTest.java
@@ -58,7 +58,7 @@ public class BufferSegmentTest {
public void testToString() {
BufferSegment segment = new BufferSegment(0, 1, 2, 3, 4, 5);
assertEquals(
- "BufferSegment{blockId[0], taskAttemptId[5], offset[1], length[2],
crc[4], uncompressLength[3]}",
+ "BufferSegment{blockId[0 (seq: 0, part: 0, task: 0)],
taskAttemptId[5], offset[1], length[2], crc[4], uncompressLength[3]}",
segment.toString());
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/ShuffleBlockInfoTest.java
b/common/src/test/java/org/apache/uniffle/common/ShuffleBlockInfoTest.java
index 71db702c8..fdac0e9eb 100644
--- a/common/src/test/java/org/apache/uniffle/common/ShuffleBlockInfoTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ShuffleBlockInfoTest.java
@@ -22,6 +22,8 @@ import java.util.List;
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.common.util.BlockId;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
public class ShuffleBlockInfoTest {
@@ -37,9 +39,9 @@ public class ShuffleBlockInfoTest {
+ info.getShuffleId()
+ "],partitionId["
+ info.getPartitionId()
- + "],blockId["
- + info.getBlockId()
- + "],length["
+ + "],"
+ + BlockId.toString(info.getBlockId())
+ + ",length["
+ info.getLength()
+ "],uncompressLength["
+ info.getUncompressLength()
@@ -54,9 +56,9 @@ public class ShuffleBlockInfoTest {
+ info2.getShuffleId()
+ "],partitionId["
+ info2.getPartitionId()
- + "],blockId["
- + info2.getBlockId()
- + "],length["
+ + "],"
+ + BlockId.toString(info2.getBlockId())
+ + ",length["
+ info2.getLength()
+ "],uncompressLength["
+ info2.getUncompressLength()
diff --git
a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
index 3f894ab9e..7f402e59d 100644
---
a/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
+++
b/common/src/test/java/org/apache/uniffle/common/ShufflePartitionedBlockTest.java
@@ -23,6 +23,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ByteBufUtils;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -70,9 +71,9 @@ public class ShufflePartitionedBlockTest {
public void testToString() {
ShufflePartitionedBlock b1 = new ShufflePartitionedBlock(1, 2, 3, 4, 5,
new byte[6]);
assertEquals(
- "ShufflePartitionedBlock{blockId["
- + b1.getBlockId()
- + "], length["
+ "ShufflePartitionedBlock{"
+ + BlockId.toString(b1.getBlockId())
+ + ", length["
+ b1.getLength()
+ "], uncompressLength["
+ b1.getUncompressLength()
diff --git
a/common/src/test/java/org/apache/uniffle/common/util/BlockIdTest.java
b/common/src/test/java/org/apache/uniffle/common/util/BlockIdTest.java
new file mode 100644
index 000000000..e42717dc5
--- /dev/null
+++ b/common/src/test/java/org/apache/uniffle/common/util/BlockIdTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.util;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class BlockIdTest {
+ @Test
+ public void test() {
+ // max value of blockId
+ assertEquals(854558029292503039L, BlockId.getBlockId(24287, 16777215,
1048575));
+ // just a random test
+ assertEquals(3518437418598500L, BlockId.getBlockId(100, 100, 100));
+ // min value of blockId
+ assertEquals(0L, BlockId.getBlockId(0, 0, 0));
+
+ BlockId blockId = BlockId.fromIds(100, 100, 100);
+ assertEquals("blockId[c80000c800064 (seq: 100, part: 100, task: 100)]",
blockId.toString());
+
+ final Throwable e1 =
+ assertThrows(IllegalArgumentException.class, () ->
BlockId.getBlockId(262144, 0, 0));
+ assertEquals("Can't support sequence[262144], the max value should be
262143", e1.getMessage());
+
+ final Throwable e2 =
+ assertThrows(IllegalArgumentException.class, () ->
BlockId.getBlockId(0, 16777216, 0));
+ assertEquals(
+ "Can't support partitionId[16777216], the max value should be
16777215", e2.getMessage());
+
+ final Throwable e3 =
+ assertThrows(IllegalArgumentException.class, () ->
BlockId.getBlockId(0, 0, 2097152));
+ assertEquals(
+ "Can't support taskAttemptId[2097152], the max value should be
2097151", e3.getMessage());
+
+ final Throwable e4 =
+ assertThrows(IllegalArgumentException.class, () ->
BlockId.getBlockId(-1, 0, 0));
+ assertEquals("Can't support sequence[-1], the max value should be 262143",
e4.getMessage());
+
+ final Throwable e5 =
+ assertThrows(IllegalArgumentException.class, () ->
BlockId.getBlockId(0, -1, 0));
+ assertEquals(
+ "Can't support partitionId[-1], the max value should be 16777215",
e5.getMessage());
+
+ final Throwable e6 =
+ assertThrows(IllegalArgumentException.class, () ->
BlockId.getBlockId(0, 0, -1));
+ assertEquals(
+ "Can't support taskAttemptId[-1], the max value should be 2097151",
e6.getMessage());
+ }
+}
diff --git
a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index 0e7048684..d8457ae99 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -229,10 +229,16 @@ public class RssUtilsTest {
public void testShuffleBitmapToPartitionBitmap() {
Roaring64NavigableMap partition1Bitmap =
Roaring64NavigableMap.bitmapOf(
- getBlockId(0, 0, 0), getBlockId(0, 0, 1), getBlockId(0, 1, 0),
getBlockId(0, 1, 1));
+ BlockId.getBlockId(0, 0, 0),
+ BlockId.getBlockId(1, 0, 0),
+ BlockId.getBlockId(0, 0, 1),
+ BlockId.getBlockId(1, 0, 1));
Roaring64NavigableMap partition2Bitmap =
Roaring64NavigableMap.bitmapOf(
- getBlockId(1, 0, 0), getBlockId(1, 0, 1), getBlockId(1, 1, 0),
getBlockId(1, 1, 1));
+ BlockId.getBlockId(0, 1, 0),
+ BlockId.getBlockId(1, 1, 0),
+ BlockId.getBlockId(0, 1, 1),
+ BlockId.getBlockId(1, 1, 1));
Roaring64NavigableMap shuffleBitmap = Roaring64NavigableMap.bitmapOf();
shuffleBitmap.or(partition1Bitmap);
shuffleBitmap.or(partition2Bitmap);
@@ -291,13 +297,6 @@ public class RssUtilsTest {
});
}
- // Copy from ClientUtils
- private Long getBlockId(long partitionId, long taskAttemptId, long
atomicInt) {
- return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
- + taskAttemptId;
- }
-
interface RssUtilTestDummy {
String get();
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
index dc43ca194..c21808c78 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -40,12 +40,12 @@ import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.segment.FixedSizeSegmentSplitter;
import org.apache.uniffle.common.segment.SegmentSplitter;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.common.util.Constants;
public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
- private static AtomicLong ATOMIC_LONG = new AtomicLong(0L);
+ private static AtomicInteger ATOMIC_INT = new AtomicInteger(0);
public static List<ShuffleServerInfo> mockSSI =
Lists.newArrayList(new ShuffleServerInfo("id", "host", 0));
@@ -62,11 +62,9 @@ public abstract class ShuffleReadWriteBase extends
IntegrationTestBase {
for (int i = 0; i < blockNum; i++) {
byte[] buf = new byte[length];
new Random().nextBytes(buf);
- long seqno = ATOMIC_LONG.getAndIncrement();
+ int seqno = ATOMIC_INT.getAndIncrement();
- long blockId =
- (seqno << (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + taskAttemptId;
+ long blockId = BlockId.getBlockId(seqno, 0, taskAttemptId);
blockIdBitmap.addLong(blockId);
dataMap.put(blockId, buf);
shuffleBlockInfoList.add(
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index a1d662c84..64d7739f1 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -56,7 +56,6 @@ import
org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import org.apache.uniffle.client.response.RssRegisterShuffleResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
-import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
@@ -65,6 +64,7 @@ import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.metrics.TestUtils;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.proto.RssProtos;
@@ -338,17 +338,17 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
assertEquals(expectedP3, blockIdBitmap);
partitionToBlockIds = Maps.newHashMap();
- blockIds1 = getBlockIdList((int) Constants.MAX_PARTITION_ID, 3);
+ blockIds1 = getBlockIdList(Constants.MAX_PARTITION_ID, 3);
blockIds2 = getBlockIdList(2, 2);
blockIds3 = getBlockIdList(3, 1);
- partitionToBlockIds.put((int) Constants.MAX_PARTITION_ID, blockIds1);
+ partitionToBlockIds.put(Constants.MAX_PARTITION_ID, blockIds1);
partitionToBlockIds.put(2, blockIds2);
partitionToBlockIds.put(3, blockIds3);
// bimapNum = 2
request = new RssReportShuffleResultRequest("shuffleResultTest", 4, 1L,
partitionToBlockIds, 2);
shuffleServerClient.reportShuffleResult(request);
- req = new RssGetShuffleResultRequest("shuffleResultTest", 4, (int)
Constants.MAX_PARTITION_ID);
+ req = new RssGetShuffleResultRequest("shuffleResultTest", 4,
Constants.MAX_PARTITION_ID);
result = shuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
expectedP1 = Roaring64NavigableMap.bitmapOf();
@@ -610,7 +610,7 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
for (int i = 0; i < 100; i++) {
Map<Integer, List<Long>> ptbs = Maps.newHashMap();
List<Long> blockIds = Lists.newArrayList();
- Long blockId = ClientUtils.getBlockId(1, 0, i);
+ Long blockId = BlockId.getBlockId(i, 1, 0);
expectedBlockIds.add(blockId);
blockIds.add(blockId);
ptbs.put(1, blockIds);
@@ -624,7 +624,7 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
for (int i = 100; i < 200; i++) {
Map<Integer, List<Long>> ptbs = Maps.newHashMap();
List<Long> blockIds = Lists.newArrayList();
- Long blockId = ClientUtils.getBlockId(1, 1, i);
+ Long blockId = BlockId.getBlockId(i, 1, 1);
expectedBlockIds.add(blockId);
blockIds.add(blockId);
ptbs.put(1, blockIds);
@@ -638,7 +638,7 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
for (int i = 200; i < 300; i++) {
Map<Integer, List<Long>> ptbs = Maps.newHashMap();
List<Long> blockIds = Lists.newArrayList();
- Long blockId = ClientUtils.getBlockId(1, 2, i);
+ Long blockId = BlockId.getBlockId(i, 1, 2);
expectedBlockIds.add(blockId);
blockIds.add(blockId);
ptbs.put(1, blockIds);
@@ -984,7 +984,7 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
private List<Long> getBlockIdList(int partitionId, int blockNum) {
List<Long> blockIds = Lists.newArrayList();
for (int i = 0; i < blockNum; i++) {
- blockIds.add(ClientUtils.getBlockId(partitionId, 0,
atomicInteger.getAndIncrement()));
+ blockIds.add(BlockId.getBlockId(atomicInteger.getAndIncrement(),
partitionId, 0));
}
return blockIds;
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 1442b2116..d87badafc 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -37,7 +37,6 @@ import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.response.SendShuffleDataResult;
-import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
@@ -45,6 +44,7 @@ import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -203,7 +203,7 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
Set<Long> blockIds = Sets.newHashSet();
int partitionIdx = 1;
for (int i = 0; i < 5; i++) {
- blockIds.add(ClientUtils.getBlockId(partitionIdx, 0, i));
+ blockIds.add(BlockId.getBlockId(i, partitionIdx, 0));
}
partitionToBlocks.put(partitionIdx, blockIds);
serverToPartitionToBlockIds.put(shuffleServerInfo1, partitionToBlocks);
@@ -251,7 +251,7 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
Map<Integer, Set<Long>> partitionToBlocks1 = Maps.newHashMap();
Set<Long> blockIds = Sets.newHashSet();
for (int i = 0; i < 5; i++) {
- blockIds.add(ClientUtils.getBlockId(1, 0, i));
+ blockIds.add(BlockId.getBlockId(i, 1, 0));
}
partitionToBlocks1.put(1, blockIds);
Map<ShuffleServerInfo, Map<Integer, Set<Long>>>
serverToPartitionToBlockIds = Maps.newHashMap();
@@ -260,7 +260,7 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
Map<Integer, Set<Long>> partitionToBlocks2 = Maps.newHashMap();
blockIds = Sets.newHashSet();
for (int i = 0; i < 7; i++) {
- blockIds.add(ClientUtils.getBlockId(2, 0, i));
+ blockIds.add(BlockId.getBlockId(i, 2, 0));
}
partitionToBlocks2.put(2, blockIds);
serverToPartitionToBlockIds.put(shuffleServerInfo2, partitionToBlocks2);
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
index 2eb2ffad7..65b699e2c 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
@@ -42,7 +42,7 @@ import
org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
@@ -105,7 +105,7 @@ public class SparkClientWithLocalTest extends
ShuffleReadWriteBase {
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
createTestData(testAppId, expectedData, blockIdBitmap, taskIdBitmap);
- blockIdBitmap.addLong((1 << Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+ blockIdBitmap.addLong(BlockId.getBlockId(0, 1, 0));
ShuffleReadClientImpl readClient;
readClient =
baseReadBuilder()
@@ -252,7 +252,9 @@ public class SparkClientWithLocalTest extends
ShuffleReadWriteBase {
Roaring64NavigableMap wrongBlockIdBitmap =
Roaring64NavigableMap.bitmapOf();
LongIterator iter = blockIdBitmap.getLongIterator();
while (iter.hasNext()) {
- wrongBlockIdBitmap.addLong(iter.next() + (1 <<
Constants.TASK_ATTEMPT_ID_MAX_LENGTH));
+ BlockId blockId = BlockId.fromLong(iter.next());
+ wrongBlockIdBitmap.addLong(
+ BlockId.getBlockId(blockId.sequenceNo, blockId.partitionId + 1,
blockId.taskAttemptId));
}
ShuffleReadClientImpl readClient =
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 7977b8016..c40003c85 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -62,6 +62,7 @@ import
org.apache.uniffle.common.exception.NoBufferForHugePartitionException;
import org.apache.uniffle.common.exception.NoRegisterException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
@@ -573,11 +574,9 @@ public class ShuffleTaskManager {
Set<Integer> requestPartitions,
Roaring64NavigableMap bitmap,
Roaring64NavigableMap resultBitmap) {
- final long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
bitmap.forEach(
blockId -> {
- int partitionId =
- Math.toIntExact((blockId >>
Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
+ int partitionId = BlockId.getPartitionId(blockId);
if (requestPartitions.contains(partitionId)) {
resultBitmap.addLong(blockId);
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index fb1de11c2..426ef4ba8 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -54,6 +54,7 @@ import
org.apache.uniffle.common.exception.NoBufferForHugePartitionException;
import org.apache.uniffle.common.exception.NoRegisterException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RssUtils;
@@ -714,7 +715,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
for (int taskId = 1; taskId < 10; taskId++) {
for (int partitionId = 1; partitionId < 10; partitionId++) {
for (int i = 0; i < 2; i++) {
- long blockId = getBlockId(partitionId, taskId, i);
+ long blockId = BlockId.getBlockId(i, partitionId, taskId);
bitmapBlockIds.addLong(blockId);
if (partitionId == expectedPartitionId) {
expectedBlockIds.addLong(blockId);
@@ -727,19 +728,19 @@ public class ShuffleTaskManagerTest extends
HadoopTestBase {
Sets.newHashSet(expectedPartitionId), bitmapBlockIds,
Roaring64NavigableMap.bitmapOf());
assertEquals(expectedBlockIds, resultBlockIds);
- bitmapBlockIds.addLong(getBlockId(0, 0, 0));
+ bitmapBlockIds.addLong(BlockId.getBlockId(0, 0, 0));
resultBlockIds =
shuffleTaskManager.getBlockIdsByPartitionId(
Sets.newHashSet(0), bitmapBlockIds,
Roaring64NavigableMap.bitmapOf());
assertEquals(Roaring64NavigableMap.bitmapOf(0L), resultBlockIds);
long expectedBlockId =
- getBlockId(
- Constants.MAX_PARTITION_ID, Constants.MAX_TASK_ATTEMPT_ID,
Constants.MAX_SEQUENCE_NO);
+ BlockId.getBlockId(
+ Constants.MAX_SEQUENCE_NO, Constants.MAX_PARTITION_ID,
Constants.MAX_TASK_ATTEMPT_ID);
bitmapBlockIds.addLong(expectedBlockId);
resultBlockIds =
shuffleTaskManager.getBlockIdsByPartitionId(
- Sets.newHashSet(Math.toIntExact(Constants.MAX_PARTITION_ID)),
+ Sets.newHashSet(Constants.MAX_PARTITION_ID),
bitmapBlockIds,
Roaring64NavigableMap.bitmapOf());
assertEquals(Roaring64NavigableMap.bitmapOf(expectedBlockId),
resultBlockIds);
@@ -757,7 +758,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
for (int taskId = 1; taskId < 10; taskId++) {
for (int partitionId = 1; partitionId < 10; partitionId++) {
for (int i = 0; i < 2; i++) {
- long blockId = getBlockId(partitionId, taskId, i);
+ long blockId = BlockId.getBlockId(i, partitionId, taskId);
bitmapBlockIds.addLong(blockId);
if (partitionId >= startPartition && partitionId <= endPartition) {
expectedBlockIds.addLong(blockId);
@@ -830,7 +831,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
long[] blockIds = new long[taskNum * blocksPerTask];
for (int taskId = 0; taskId < taskNum; taskId++) {
for (int i = 0; i < blocksPerTask; i++) {
- long blockId = getBlockId(partitionId, taskId, i);
+ long blockId = BlockId.getBlockId(i, partitionId, taskId);
blockIds[taskId * blocksPerTask + i] = blockId;
}
}
@@ -993,13 +994,6 @@ public class ShuffleTaskManagerTest extends HadoopTestBase
{
return appIdsOnDisk;
}
- // copy from ClientUtils
- private Long getBlockId(long partitionId, long taskAttemptId, long
atomicInt) {
- return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH)
- + taskAttemptId;
- }
-
private void waitForFlush(
ShuffleFlushManager shuffleFlushManager, String appId, int shuffleId,
int expectedBlockNum)
throws Exception {
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/FileBasedShuffleSegment.java
b/storage/src/main/java/org/apache/uniffle/storage/common/FileBasedShuffleSegment.java
index d4e25c4d8..38f3abbfa 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/common/FileBasedShuffleSegment.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/common/FileBasedShuffleSegment.java
@@ -19,6 +19,8 @@ package org.apache.uniffle.storage.common;
import java.util.Objects;
+import org.apache.uniffle.common.util.BlockId;
+
public class FileBasedShuffleSegment extends ShuffleSegment
implements Comparable<FileBasedShuffleSegment> {
@@ -119,9 +121,9 @@ public class FileBasedShuffleSegment extends ShuffleSegment
+ uncompressLength
+ "], crc["
+ crc
- + "], blockId["
- + blockId
- + "], taskAttemptId["
+ + "], "
+ + BlockId.toString(blockId)
+ + ", taskAttemptId["
+ taskAttemptId
+ "]}";
}
diff --git
a/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
b/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
index b79d18fe3..7b3049368 100644
---
a/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
+++
b/storage/src/test/java/org/apache/uniffle/storage/HadoopShuffleHandlerTestBase.java
@@ -21,7 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
@@ -30,9 +30,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
import org.apache.uniffle.storage.handler.impl.HadoopFileReader;
import org.apache.uniffle.storage.handler.impl.HadoopFileWriter;
@@ -43,7 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class HadoopShuffleHandlerTestBase {
- private static final AtomicLong ATOMIC_LONG = new AtomicLong(0);
+ private static final AtomicInteger ATOMIC_INT = new AtomicInteger(0);
public static void writeTestData(
HadoopShuffleWriteHandler writeHandler,
@@ -56,10 +56,7 @@ public class HadoopShuffleHandlerTestBase {
for (int i = 0; i < num; i++) {
byte[] buf = new byte[length];
new Random().nextBytes(buf);
- long blockId =
- (ATOMIC_LONG.getAndIncrement()
- << (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + taskAttemptId;
+ long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), 0,
taskAttemptId);
blocks.add(
new ShufflePartitionedBlock(
length, length, ChecksumUtils.getCrc32(buf), blockId,
taskAttemptId, buf));
@@ -84,10 +81,7 @@ public class HadoopShuffleHandlerTestBase {
for (int i = 0; i < num; i++) {
byte[] buf = new byte[length];
new Random().nextBytes(buf);
- long blockId =
- (ATOMIC_LONG.getAndIncrement()
- << (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + taskAttemptId;
+ long blockId = BlockId.getBlockId(ATOMIC_INT.getAndIncrement(), 0,
taskAttemptId);
blocks.add(
new ShufflePartitionedBlock(
length, length, ChecksumUtils.getCrc32(buf), blockId,
taskAttemptId, buf));
diff --git
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
index ed383388c..8b307a3e3 100644
---
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
+++
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandlerTest.java
@@ -36,8 +36,8 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.HadoopShuffleHandlerTestBase;
import org.apache.uniffle.storage.HadoopTestBase;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
@@ -118,10 +118,7 @@ public class HadoopShuffleReadHandlerTest extends
HadoopTestBase {
List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
byte[] buf = new byte[blockSize];
new Random().nextBytes(buf);
- long blockId =
- (expectTotalBlockNum
- << (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
- + taskAttemptId;
+ long blockId = BlockId.getBlockId(expectTotalBlockNum, 0, taskAttemptId);
blocks.add(
new ShufflePartitionedBlock(
blockSize, blockSize, ChecksumUtils.getCrc32(buf), blockId,
taskAttemptId, buf));
diff --git
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
index e02387e25..19a2905ac 100644
---
a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
+++
b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/LocalFileHandlerTestBase.java
@@ -22,7 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -33,6 +33,7 @@ import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.segment.FixedSizeSegmentSplitter;
+import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
@@ -43,14 +44,14 @@ import static
org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class LocalFileHandlerTestBase {
- private static AtomicLong ATOMIC_LONG = new AtomicLong(0L);
+ private static AtomicInteger ATOMIC_INT = new AtomicInteger(0);
public static List<ShufflePartitionedBlock> generateBlocks(int num, int
length) {
List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
for (int i = 0; i < num; i++) {
byte[] buf = new byte[length];
new Random().nextBytes(buf);
- long blockId = ATOMIC_LONG.incrementAndGet();
+ long blockId = BlockId.getBlockId(ATOMIC_INT.incrementAndGet(), 0, 100);
blocks.add(
new ShufflePartitionedBlock(
length, length, ChecksumUtils.getCrc32(buf), blockId, 100, buf));