This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 55191c43 [Improvement] Skip blocks when read from memory (#294)
55191c43 is described below

commit 55191c43cc94dc0e72ca378e3c65b743bf66bbb0
Author: xianjingfeng <[email protected]>
AuthorDate: Sun Dec 11 22:27:07 2022 +0800

    [Improvement] Skip blocks when read from memory (#294)
    
    ### What changes were proposed in this pull request?
    Skip blocks which not in expected blockId range when read from memory.
    
    ### Why are the changes needed?
    1.If we use AQE, every task will read data from all partitions.
    2.If the data of the first shuffle server is incomplete, we need to read 
from another server if #276 is merged.
    Both of the above situations will lead to  read redundant data from shuffle 
server.
    ### Does this PR introduce _any_ user-facing change?
    Set `rss.client.read.block.skip.strategy` to `BLOCKID_RANGE`.
    
    ### How was this patch tested?
    Already added
---
 .../org/apache/hadoop/mapreduce/RssMRConfig.java   | 10 +++
 .../hadoop/mapreduce/task/reduce/RssShuffle.java   | 11 ++-
 .../org/apache/spark/shuffle/RssSparkConfig.java   |  6 ++
 .../spark/shuffle/reader/RssShuffleReader.java     | 15 +++-
 .../spark/shuffle/reader/RssShuffleReader.java     | 21 +++--
 .../client/factory/ShuffleClientFactory.java       |  3 +-
 .../uniffle/client/impl/ShuffleReadClientImpl.java | 11 ++-
 .../request/CreateShuffleReadClientRequest.java    | 38 ++++++---
 .../uniffle/client/util/RssClientConfig.java       |  9 ++-
 .../apache/uniffle/common/BlockSkipStrategy.java   | 24 ++++++
 .../org/apache/uniffle/common/util/RssUtils.java   | 92 +++++++++++++++++++++
 .../apache/uniffle/common/util/RssUtilsTest.java   | 94 ++++++++++++++++++++++
 .../uniffle/test/ShuffleServerWithMemoryTest.java  | 62 ++++++++++++++
 ...SkewedJoinWithBlockIdRangeSkipStrategyTest.java | 32 ++++++++
 .../client/impl/grpc/ShuffleServerGrpcClient.java  |  1 +
 .../request/RssGetInMemoryShuffleDataRequest.java  | 11 ++-
 proto/src/main/proto/Rss.proto                     |  1 +
 .../uniffle/server/ShuffleServerGrpcService.java   |  4 +-
 .../apache/uniffle/server/ShuffleTaskManager.java  |  4 +-
 .../uniffle/server/buffer/ShuffleBuffer.java       | 42 +++++++---
 .../server/buffer/ShuffleBufferManager.java        |  6 +-
 .../server/buffer/ShuffleBufferManagerTest.java    | 33 ++++++--
 .../uniffle/server/buffer/ShuffleBufferTest.java   | 79 ++++++++++--------
 .../storage/factory/ShuffleHandlerFactory.java     |  5 +-
 .../handler/impl/MemoryClientReadHandler.java      | 35 ++++++--
 .../request/CreateShuffleReadHandlerRequest.java   | 20 +++--
 26 files changed, 579 insertions(+), 90 deletions(-)

diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
index f9feb85f..f37c6e06 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
@@ -179,6 +179,16 @@ public class RssMRConfig {
   public static final int 
RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE =
       RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE;
 
+  public static final String RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY =
+      MR_RSS_CONFIG_PREFIX + 
RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY;
+  public static final String RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE 
=
+      RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE;
+
+  public static final String RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS =
+      MR_RSS_CONFIG_PREFIX + 
RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS;
+  public static final int 
RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE =
+      RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE;
+
   public static final String RSS_CONF_FILE = "rss_conf.xml";
 
   public static final Set<String> RSS_MANDATORY_CLUSTER_CONF =
diff --git 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
index e5af9795..32c14b06 100644
--- 
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
+++ 
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
@@ -42,6 +42,7 @@ import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
+import org.apache.uniffle.common.BlockSkipStrategy;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.util.UnitConverter;
@@ -82,6 +83,8 @@ public class RssShuffle<K, V> implements 
ShuffleConsumerPlugin<K, V>, ExceptionR
   private int readBufferSize;
   private RemoteStorageInfo remoteStorageInfo;
   private int appAttemptId;
+  private BlockSkipStrategy blockSkipStrategy;
+  private int maxBlockIdRangeSegments;
 
   @Override
   public void init(ShuffleConsumerPlugin.Context context) {
@@ -109,6 +112,12 @@ public class RssShuffle<K, V> implements 
ShuffleConsumerPlugin<K, V>, ExceptionR
         RssMRConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
     this.replica = RssMRUtils.getInt(rssJobConf, mrJobConf, 
RssMRConfig.RSS_DATA_REPLICA,
         RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);
+    blockSkipStrategy = 
BlockSkipStrategy.valueOf(RssMRUtils.getString(rssJobConf, mrJobConf,
+            RssMRConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY,
+        RssMRConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE));
+    maxBlockIdRangeSegments = RssMRUtils.getInt(rssJobConf, mrJobConf,
+        RssMRConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS,
+        RssMRConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE);
 
     this.partitionNum = mrJobConf.getNumReduceTasks();
     this.partitionNumPerRange = RssMRUtils.getInt(rssJobConf, mrJobConf, 
RssMRConfig.RSS_PARTITION_NUM_PER_RANGE,
@@ -194,7 +203,7 @@ public class RssShuffle<K, V> implements 
ShuffleConsumerPlugin<K, V>, ExceptionR
       CreateShuffleReadClientRequest request = new 
CreateShuffleReadClientRequest(
           appId, 0, reduceId.getTaskID().getId(), storageType, basePath, 
indexReadLimit, readBufferSize,
           partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, 
serverInfoList,
-          readerJobConf, new MRIdHelper());
+          readerJobConf, new MRIdHelper(), blockSkipStrategy, 
maxBlockIdRangeSegments);
       ShuffleReadClient shuffleReadClient = 
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
       RssFetcher fetcher = new RssFetcher(mrJobConf, reduceId, taskStatus, 
merger, copyPhase, reporter, metrics,
           shuffleReadClient, blockIdBitmap.getLongCardinality(), 
RssMRConfig.toRssConf(rssJobConf));
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 9da401d2..958a05df 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -265,6 +265,12 @@ public class RssSparkConfig {
                    + " spark.rss.estimate.server.assignment.enabled"))
       
.createWithDefault(RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE);
 
+  public static final ConfigEntry<String> RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY 
= createStringBuilder(
+      new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX +  
RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY)
+          .doc("The strategy for skip block when read from memory."))
+      
.createWithDefault(RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE);
+
+
   public static final Set<String> RSS_MANDATORY_CLUSTER_CONF =
       ImmutableSet.of(RSS_STORAGE_TYPE.key(), RSS_REMOTE_STORAGE_PATH.key());
 
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index cc6fe254..b08d01bf 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -45,6 +45,8 @@ import scala.runtime.BoxedUnit;
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
+import org.apache.uniffle.client.util.RssClientConfig;
+import org.apache.uniffle.common.BlockSkipStrategy;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssConf;
 
@@ -71,6 +73,8 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
   private List<ShuffleServerInfo> shuffleServerInfoList;
   private Configuration hadoopConf;
   private RssConf rssConf;
+  private final BlockSkipStrategy blockSkipStrategy;
+  private final int maxBlockIdRangeSegments;
 
   public RssShuffleReader(
       int startPartition,
@@ -107,6 +111,14 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
     this.shuffleServerInfoList =
         (List<ShuffleServerInfo>) 
(rssShuffleHandle.getPartitionToServers().get(startPartition));
     this.rssConf = rssConf;
+
+    BlockSkipStrategy blockSkipStrategy = BlockSkipStrategy.valueOf(
+        rssConf.getString(RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY,
+            
RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE));
+    this.blockSkipStrategy = shuffleServerInfoList.size() <= 1 ? 
BlockSkipStrategy.NONE : blockSkipStrategy;
+
+    maxBlockIdRangeSegments = 
rssConf.getInteger(RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS,
+        
RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE);
   }
 
   @Override
@@ -115,7 +127,8 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
 
     CreateShuffleReadClientRequest request = new 
CreateShuffleReadClientRequest(
         appId, shuffleId, startPartition, storageType, basePath, 
indexReadLimit, readBufferSize,
-        partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, 
shuffleServerInfoList, hadoopConf);
+        partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, 
shuffleServerInfoList, hadoopConf,
+        blockSkipStrategy, maxBlockIdRangeSegments);
     ShuffleReadClient shuffleReadClient = 
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
     RssShuffleDataIterator rssShuffleDataIterator = new 
RssShuffleDataIterator<K, C>(
         shuffleDependency.serializer(), shuffleReadClient,
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 89e17664..31101262 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -49,6 +49,8 @@ import scala.runtime.BoxedUnit;
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.request.CreateShuffleReadClientRequest;
+import org.apache.uniffle.client.util.RssClientConfig;
+import org.apache.uniffle.common.BlockSkipStrategy;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssConf;
@@ -56,7 +58,6 @@ import org.apache.uniffle.common.config.RssConf;
 public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
   private static final Logger LOG = 
LoggerFactory.getLogger(RssShuffleReader.class);
   private final Map<Integer, List<ShuffleServerInfo>> 
partitionToShuffleServers;
-
   private String appId;
   private int shuffleId;
   private int startPartition;
@@ -78,7 +79,8 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
   private ShuffleReadMetrics readMetrics;
   private RssConf rssConf;
   private ShuffleDataDistributionType dataDistributionType;
-  private boolean expectedTaskIdsBitmapFilterEnable;
+  private final BlockSkipStrategy blockSkipStrategy;
+  private final int maxBlockIdRangeSegments;
 
   public RssShuffleReader(
       int startPartition,
@@ -120,9 +122,11 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
     this.partitionToShuffleServers = rssShuffleHandle.getPartitionToServers();
     this.rssConf = rssConf;
     this.dataDistributionType = dataDistributionType;
-    // This mechanism of expectedTaskIdsBitmap filter is to filter out the 
most of data.
-    // especially for AQE skew optimization
-    this.expectedTaskIdsBitmapFilterEnable = !(mapStartIndex == 0 && 
mapEndIndex == Integer.MAX_VALUE);
+    blockSkipStrategy = BlockSkipStrategy.valueOf(
+        rssConf.getString(RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY,
+            
RssClientConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE));
+    maxBlockIdRangeSegments = 
rssConf.getInteger(RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS,
+        
RssClientConfig.RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE);
   }
 
   @Override
@@ -207,10 +211,15 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
           continue;
         }
         List<ShuffleServerInfo> shuffleServerInfoList = 
partitionToShuffleServers.get(partition);
+        // If AQE is disable and the number of replica is 1, we should set 
BlockSkipStrategy to NONE
+        // for reduce data transmission
+        BlockSkipStrategy realBlockSkipStrategy = shuffleServerInfoList.size() 
<= 1
+            && mapStartIndex == 0 && mapEndIndex == Integer.MAX_VALUE
+            ? BlockSkipStrategy.NONE : blockSkipStrategy;
         CreateShuffleReadClientRequest request = new 
CreateShuffleReadClientRequest(
             appId, shuffleId, partition, storageType, basePath, 
indexReadLimit, readBufferSize,
             1, partitionNum, partitionToExpectBlocks.get(partition), 
taskIdBitmap, shuffleServerInfoList,
-            hadoopConf, dataDistributionType, 
expectedTaskIdsBitmapFilterEnable);
+            hadoopConf, dataDistributionType, realBlockSkipStrategy, 
maxBlockIdRangeSegments);
         ShuffleReadClient shuffleReadClient = 
ShuffleClientFactory.getInstance().createShuffleReadClient(request);
         RssShuffleDataIterator iterator = new RssShuffleDataIterator<K, C>(
             shuffleDependency.serializer(), shuffleReadClient,
diff --git 
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
 
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
index 4229fc77..547e6af0 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
@@ -87,7 +87,8 @@ public class ShuffleClientFactory {
         request.getHadoopConf(),
         request.getIdHelper(),
         request.getShuffleDataDistributionType(),
-        request.isExpectedTaskIdsBitmapFilterEnable()
+        request.getBlockSkipStrategy(),
+        request.getMaxBlockIdRangeSegments()
     );
   }
 }
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 1cc80a41..eacefe78 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
 import org.apache.uniffle.client.util.IdHelper;
+import org.apache.uniffle.common.BlockSkipStrategy;
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
@@ -78,7 +79,8 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
       Configuration hadoopConf,
       IdHelper idHelper,
       ShuffleDataDistributionType dataDistributionType,
-      boolean expectedTaskIdsBitmapFilterEnable) {
+      BlockSkipStrategy blockSkipStrategy,
+      int maxBlockIdRangeSegments) {
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.blockIdBitmap = blockIdBitmap;
@@ -102,9 +104,10 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
     request.setProcessBlockIds(processedBlockIds);
     request.setDistributionType(dataDistributionType);
     request.setExpectTaskIds(taskIdBitmap);
-    if (expectedTaskIdsBitmapFilterEnable) {
-      request.useExpectedTaskIdsBitmapFilter();
+    if (BlockSkipStrategy.BLOCKID_RANGE.equals(blockSkipStrategy)) {
+      request.setMaxBlockIdRangeSegments(maxBlockIdRangeSegments);
     }
+    request.setBlockSkipStrategy(blockSkipStrategy);
 
     List<Long> removeBlockIds = Lists.newArrayList();
     blockIdBitmap.forEach(bid -> {
@@ -141,7 +144,7 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
     this(storageType, appId, shuffleId, partitionId, indexReadLimit,
         partitionNumPerRange, partitionNum, readBufferSize, storageBasePath,
         blockIdBitmap, taskIdBitmap, shuffleServerInfoList, hadoopConf,
-        idHelper, ShuffleDataDistributionType.NORMAL, false);
+        idHelper, ShuffleDataDistributionType.NORMAL, 
BlockSkipStrategy.TASK_BITMAP, 0);
   }
 
   @Override
diff --git 
a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
 
b/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
index db050304..3362245b 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/request/CreateShuffleReadClientRequest.java
@@ -24,6 +24,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.client.util.DefaultIdHelper;
 import org.apache.uniffle.client.util.IdHelper;
+import org.apache.uniffle.common.BlockSkipStrategy;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 
@@ -44,7 +45,8 @@ public class CreateShuffleReadClientRequest {
   private Configuration hadoopConf;
   private IdHelper idHelper;
   private ShuffleDataDistributionType shuffleDataDistributionType = 
ShuffleDataDistributionType.NORMAL;
-  private boolean expectedTaskIdsBitmapFilterEnable = false;
+  private BlockSkipStrategy blockSkipStrategy;
+  private int maxBlockIdRangeSegments;
 
   public CreateShuffleReadClientRequest(
       String appId,
@@ -61,12 +63,12 @@ public class CreateShuffleReadClientRequest {
       List<ShuffleServerInfo> shuffleServerInfoList,
       Configuration hadoopConf,
       ShuffleDataDistributionType dataDistributionType,
-      boolean expectedTaskIdsBitmapFilterEnable) {
+      BlockSkipStrategy blockSkipStrategy,
+      int maxBlockIdRangeSegments) {
     this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, 
readBufferSize,
         partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, 
shuffleServerInfoList,
-        hadoopConf, new DefaultIdHelper());
+        hadoopConf, new DefaultIdHelper(), blockSkipStrategy, 
maxBlockIdRangeSegments);
     this.shuffleDataDistributionType = dataDistributionType;
-    this.expectedTaskIdsBitmapFilterEnable = expectedTaskIdsBitmapFilterEnable;
   }
 
   public CreateShuffleReadClientRequest(
@@ -82,10 +84,12 @@ public class CreateShuffleReadClientRequest {
       Roaring64NavigableMap blockIdBitmap,
       Roaring64NavigableMap taskIdBitmap,
       List<ShuffleServerInfo> shuffleServerInfoList,
-      Configuration hadoopConf) {
+      Configuration hadoopConf,
+      BlockSkipStrategy blockSkipStrategy,
+      int maxBlockIdRangeSegments) {
     this(appId, shuffleId, partitionId, storageType, basePath, indexReadLimit, 
readBufferSize,
         partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, 
shuffleServerInfoList,
-        hadoopConf, new DefaultIdHelper());
+        hadoopConf, new DefaultIdHelper(), blockSkipStrategy, 
maxBlockIdRangeSegments);
   }
 
   public CreateShuffleReadClientRequest(
@@ -102,7 +106,9 @@ public class CreateShuffleReadClientRequest {
       Roaring64NavigableMap taskIdBitmap,
       List<ShuffleServerInfo> shuffleServerInfoList,
       Configuration hadoopConf,
-      IdHelper idHelper) {
+      IdHelper idHelper,
+      BlockSkipStrategy blockSkipStrategy,
+      int maxBlockIdRangeSegments) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
@@ -117,6 +123,8 @@ public class CreateShuffleReadClientRequest {
     this.shuffleServerInfoList = shuffleServerInfoList;
     this.hadoopConf = hadoopConf;
     this.idHelper = idHelper;
+    this.blockSkipStrategy = blockSkipStrategy;
+    this.maxBlockIdRangeSegments = maxBlockIdRangeSegments;
   }
 
   public String getAppId() {
@@ -179,7 +187,19 @@ public class CreateShuffleReadClientRequest {
     return shuffleDataDistributionType;
   }
 
-  public boolean isExpectedTaskIdsBitmapFilterEnable() {
-    return expectedTaskIdsBitmapFilterEnable;
+  public int getMaxBlockIdRangeSegments() {
+    return maxBlockIdRangeSegments;
+  }
+
+  public void setMaxBlockIdRangeSegments(int maxBlockIdRangeSegments) {
+    this.maxBlockIdRangeSegments = maxBlockIdRangeSegments;
+  }
+
+  public BlockSkipStrategy getBlockSkipStrategy() {
+    return blockSkipStrategy;
+  }
+
+  public void setBlockSkipStrategy(BlockSkipStrategy blockSkipStrategy) {
+    this.blockSkipStrategy = blockSkipStrategy;
   }
 }
diff --git 
a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java 
b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
index 5ab3a6fb..fa8375a2 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
@@ -61,7 +61,7 @@ public class RssClientConfig {
   // The tags specified by rss client to determine server assignment.
   public static final String RSS_CLIENT_ASSIGNMENT_TAGS = 
"rss.client.assignment.tags";
   public static final String RSS_TEST_MODE_ENABLE = "rss.test.mode.enable";
-  
+
   public static final String RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL = 
"rss.client.assignment.retry.interval";
   public static final long RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE 
= 65000;
   public static final String RSS_CLIENT_ASSIGNMENT_RETRY_TIMES = 
"rss.client.assignment.retry.times";
@@ -86,4 +86,11 @@ public class RssClientConfig {
   public static final String RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER = 
"rss.estimate.task.concurrency.per.server";
   public static final int 
RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE = 80;
 
+  public static final String RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY = 
"rss.client.read.block.skip.strategy";
+
+  public static final String RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY_DEFAULT_VALUE 
= "TASK_BITMAP";
+
+  public static final String RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS =
+      "rss.client.read.block.skip.range.segments.max";
+  public static final int 
RSS_CLIENT_READ_FILTER_RANGE_MAX_SEGMENTS_DEFAULT_VALUE = 10;
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/BlockSkipStrategy.java 
b/common/src/main/java/org/apache/uniffle/common/BlockSkipStrategy.java
new file mode 100644
index 00000000..a07ad4e6
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/BlockSkipStrategy.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+public enum BlockSkipStrategy {
+  NONE,
+  TASK_BITMAP,
+  BLOCKID_RANGE
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index fb05894e..fb4d8bf5 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -31,6 +31,8 @@ import java.net.InetAddress;
 import java.net.InterfaceAddress;
 import java.net.NetworkInterface;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -38,11 +40,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.net.InetAddresses;
+import org.apache.commons.lang3.tuple.Pair;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -283,4 +287,92 @@ public class RssUtils {
           + " blocks, actual " + cloneBitmap.getLongCardinality() + " blocks");
     }
   }
+
+  /**
+   * Generate range segments for blockId bitmap
+   * @param blockIdBitmap blockId bitmap
+   * @param maxSegments Maximum number of segments to return
+   * @return Range segments.like [start1, end1, start2, end2]
+   */
+  public static List<Long> generateRangeSegments(Roaring64NavigableMap 
blockIdBitmap, int maxSegments) {
+    Iterator<Long> iterator = blockIdBitmap.iterator();
+    if (!iterator.hasNext()) {
+      return Collections.EMPTY_LIST;
+    }
+    List<Long> endPoints = Lists.newArrayList();
+    long lastId = iterator.next();
+    endPoints.add(lastId);
+    while (iterator.hasNext()) {
+      long blockId = iterator.next();
+      try {
+        if (blockId - lastId <= 1) {
+          continue;
+        }
+        endPoints.add(lastId);
+        endPoints.add(blockId);
+      } finally {
+        lastId = blockId;
+      }
+    }
+    if (endPoints.size() % 2 != 0) {
+      endPoints.add(lastId);
+    }
+    return mergeRangeSegments(endPoints, maxSegments);
+  }
+
+  /**
+   * Merge range segments
+   * @param endPoints EndPoints of all segments.
+   * @param maxSegments Maximum number of segments to return
+   * @return Merged segments
+   */
+  public static List<Long> mergeRangeSegments(List<Long> endPoints, int 
maxSegments) {
+    int maxPoints = maxSegments * 2;
+    if (endPoints.size() < 2 || endPoints.size() <= maxPoints) {
+      return endPoints;
+    }
+    // distance of two segments -> the offset of the second segment
+    List<Pair<Long, Integer>> distinces = Lists.newArrayList();
+    for (int i = 2; i < endPoints.size(); i += 2) {
+      long distance = endPoints.get(i) - endPoints.get(i - 1);
+      distinces.add(Pair.of(distance, i));
+    }
+    distinces.sort(Comparator.comparingLong(Pair::getLeft));
+
+    int mergeSegmentNum = endPoints.size() / 2 - maxSegments;
+    // Find the nearest segments top N(mergeSegmentNum)
+    List<Integer> indexsToMerge = distinces.stream().limit(mergeSegmentNum)
+        .map(e -> 
e.getValue()).sorted(Comparator.reverseOrder()).collect(Collectors.toList());
+    // Merge segments
+    for (Integer index : indexsToMerge) {
+      // Don't remove (int), or remove(Object o) will be invoked;
+      endPoints.remove((int)index);
+      endPoints.remove(index - 1);
+    }
+    return endPoints;
+  }
+
+  /**
+   * Check if the gived blockId inside the range segments. Use binary search
+   * @param rangeSegments Range segments.like [start1, end1, start2, end2]
+   * @param blockId BlockId to check
+   * @return true if blockId inside the range segments
+   */
+  public static boolean checkIfBlockInRange(List<Long> rangeSegments, Long 
blockId) {
+    int lower = 0;
+    int upper = rangeSegments.size() - 1;
+    Comparator<Long> comparator = Comparator.naturalOrder();
+    while (lower <= upper) {
+      int middle = (lower + upper) >>> 1;
+      int c = comparator.compare(blockId, rangeSegments.get(middle));
+      if (c < 0) {
+        upper = middle - 1;
+      } else if (c > 0) {
+        lower = middle + 1;
+      } else {
+        return true;
+      }
+    }
+    return lower % 2 != 0;
+  }
 }
diff --git 
a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java 
b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index a84e3902..36422d89 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -21,6 +21,7 @@ import java.lang.reflect.Field;
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +39,7 @@ import org.apache.uniffle.common.ShuffleServerInfo;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertIterableEquals;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -176,6 +178,98 @@ public class RssUtilsTest {
     assertEquals(serverToPartitions.get(server4), Sets.newHashSet(2, 4));
   }
 
+  @Test
+  public void testMergeRangeSegments() {
+    List<Long> endPoints = Lists.newArrayList(1L, 2L, 5L, 6L);
+    List<Long> results = RssUtils.mergeRangeSegments(endPoints, 1);
+    ArrayList<Long> expectResults = Lists.newArrayList(1L, 6L);
+    assertIterableEquals(results, expectResults);
+
+    endPoints = Lists.newArrayList(1L, 2L, 5L, 6L, 8L, 10L);
+    results = RssUtils.mergeRangeSegments(endPoints, 2);
+    expectResults = Lists.newArrayList(1L, 2L, 5L, 10L);
+    assertIterableEquals(results, expectResults);
+
+    endPoints = Lists.newArrayList(1L, 2L, 5L, 6L, 10L, 12L);
+    results = RssUtils.mergeRangeSegments(endPoints, 2);
+    expectResults = Lists.newArrayList(1L, 6L, 10L, 12L);
+    assertIterableEquals(results, expectResults);
+
+    endPoints = Lists.newArrayList(1L, 1L, 3L, 3L, 5L, 5L);
+    results = RssUtils.mergeRangeSegments(endPoints, 2);
+    expectResults = Lists.newArrayList(1L, 3L, 5L, 5L);
+    assertIterableEquals(results, expectResults);
+  }
+
+  @Test
+  public void testGenerateRangeSegments() {
+    Random random = new Random();
+    Roaring64NavigableMap bitmap = Roaring64NavigableMap.bitmapOf();
+    for (int i = 0; i < 1000; i++) {
+      if (random.nextInt(10) < 3) {
+        continue;
+      }
+      bitmap.add(i);
+    }
+    int maxSegments = 3;
+    List<Long> segments = RssUtils.generateRangeSegments(bitmap, 3);
+    assertEquals(maxSegments * 2, segments.size());
+
+    List<Long> endPoints = Lists.newArrayList(1L, 2L, 5L, 6L);
+    bitmap = Roaring64NavigableMap.bitmapOf();
+    for (Long endPoint : endPoints) {
+      bitmap.add(endPoint);
+    }
+    List<Long> results = RssUtils.generateRangeSegments(bitmap, 100);
+    List<Long> expectResults = Lists.newArrayList(1L, 2L, 5L, 6L);
+    assertIterableEquals(results, expectResults);
+
+    bitmap = Roaring64NavigableMap.bitmapOf();
+    endPoints = Lists.newArrayList(1L, 2L, 5L, 6L, 8L, 9L, 10L);
+    for (Long endPoint : endPoints) {
+      bitmap.add(endPoint);
+    }
+    results = RssUtils.generateRangeSegments(bitmap, 100);
+    expectResults = Lists.newArrayList(1L, 2L, 5L, 6L, 8L, 10L);
+    assertIterableEquals(results, expectResults);
+
+    bitmap = Roaring64NavigableMap.bitmapOf();
+    endPoints = Lists.newArrayList(1L, 2L, 5L, 6L, 10L, 11L, 12L);
+    for (Long endPoint : endPoints) {
+      bitmap.add(endPoint);
+    }
+    results = RssUtils.generateRangeSegments(bitmap, 100);
+    expectResults = Lists.newArrayList(1L, 2L, 5L, 6L, 10L, 12L);
+    assertIterableEquals(results, expectResults);
+
+    bitmap = Roaring64NavigableMap.bitmapOf();
+    endPoints = Lists.newArrayList(1L, 1L, 3L, 3L, 5L, 5L);
+    for (Long endPoint : endPoints) {
+      bitmap.add(endPoint);
+    }
+    results = RssUtils.generateRangeSegments(bitmap, 100);
+    expectResults = Lists.newArrayList(1L, 1L, 3L, 3L, 5L, 5L);
+    assertIterableEquals(results, expectResults);
+  }
+
+  @Test
+  public void testCheckIfBlockInRange() {
+    List<Long> rangeSegments = Lists.newArrayList(1L, 2L, 5L, 6L, 10L, 12L);
+    Roaring64NavigableMap bitmap = Roaring64NavigableMap.bitmapOf();
+    for (Long element : rangeSegments) {
+      bitmap.add(element);
+    }
+    assertFalse(RssUtils.checkIfBlockInRange(rangeSegments, 13L));
+    assertTrue(RssUtils.checkIfBlockInRange(rangeSegments, 10L));
+    assertFalse(RssUtils.checkIfBlockInRange(rangeSegments, 9L));
+    assertTrue(RssUtils.checkIfBlockInRange(rangeSegments, 5L));
+    assertFalse(RssUtils.checkIfBlockInRange(rangeSegments, 4L));
+    assertFalse(RssUtils.checkIfBlockInRange(rangeSegments, 3L));
+    assertTrue(RssUtils.checkIfBlockInRange(rangeSegments, 2L));
+    assertTrue(RssUtils.checkIfBlockInRange(rangeSegments, 1L));
+    assertFalse(RssUtils.checkIfBlockInRange(rangeSegments, 0L));
+  }
+
   // Copy from ClientUtils
   private Long getBlockId(long partitionId, long taskAttemptId, long 
atomicInt) {
     return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + 
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
index accc280f..551b9be6 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
@@ -34,6 +34,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
 import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
 import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.common.BlockSkipStrategy;
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.ShuffleBlockInfo;
@@ -306,6 +307,67 @@ public class ShuffleServerWithMemoryTest extends 
ShuffleReadWriteBase {
     assertNull(sdr);
   }
 
+
+  @Test
+  public void memoryReadTestWithSkipByBlockIdRange() throws Exception {
+    String testAppId = "memoryReadTestWithSkipByBlockIdRange";
+    int shuffleId = 0;
+    int partitionId = 0;
+    RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest(testAppId, 
0,
+        Lists.newArrayList(new PartitionRange(0, 0)), "");
+    shuffleServerClient.registerShuffle(rrsr);
+    Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
+    Map<Long, byte[]> dataMap = Maps.newHashMap();
+    Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[1];
+    bitmaps[0] = Roaring64NavigableMap.bitmapOf();
+    List<ShuffleBlockInfo> blocks = createShuffleBlockList(
+        shuffleId, partitionId, 0, 3, 25,
+        expectBlockIds, dataMap, mockSSI);
+    Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
+    partitionToBlocks.put(partitionId, blocks);
+    Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = 
Maps.newHashMap();
+    shuffleToBlocks.put(shuffleId, partitionToBlocks);
+
+    // send data to shuffle server
+    RssSendShuffleDataRequest rssdr = new RssSendShuffleDataRequest(
+        testAppId, 3, 1000, shuffleToBlocks);
+    shuffleServerClient.sendShuffleData(rssdr);
+
+    // data is cached
+    assertEquals(3, shuffleServers.get(0).getShuffleBufferManager()
+        .getShuffleBuffer(testAppId, shuffleId, 0).getBlocks().size());
+
+    Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
+
+    MemoryClientReadHandler memoryClientReadHandler = new 
MemoryClientReadHandler(
+        testAppId, shuffleId, partitionId, 20, shuffleServerClient, null,
+        expectBlockIds, processBlockIds, BlockSkipStrategy.BLOCKID_RANGE, 3);
+
+    ShuffleDataResult sdr  = memoryClientReadHandler.readShuffleData();
+    Map<Long, byte[]> expectedData = Maps.newHashMap();
+    expectedData.put(blocks.get(0).getBlockId(), blocks.get(0).getData());
+    validateResult(expectedData, sdr);
+
+    // skip the first block
+    processBlockIds.add(blocks.get(0).getBlockId());
+    MemoryClientReadHandler memoryClientReadHandler2 = new 
MemoryClientReadHandler(
+        testAppId, shuffleId, partitionId, 20, shuffleServerClient, null,
+        expectBlockIds, processBlockIds, BlockSkipStrategy.BLOCKID_RANGE, 3);
+    sdr = memoryClientReadHandler2.readShuffleData();
+    expectedData = Maps.newHashMap();
+    expectedData.put(blocks.get(1).getBlockId(), blocks.get(1).getData());
+    validateResult(expectedData, sdr);
+
+    // skip all blocks
+    blocks.forEach((block) -> processBlockIds.add(block.getBlockId()));
+    MemoryClientReadHandler memoryClientReadHandler3 = new 
MemoryClientReadHandler(
+        testAppId, shuffleId, partitionId, 20, shuffleServerClient, null,
+        expectBlockIds, processBlockIds, BlockSkipStrategy.BLOCKID_RANGE, 3);
+    sdr = memoryClientReadHandler3.readShuffleData();
+    assertNull(sdr);
+  }
+
+
   protected void validateResult(
       Map<Long, byte[]> expectedData,
       ShuffleDataResult sdr) {
diff --git 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithBlockIdRangeSkipStrategyTest.java
 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithBlockIdRangeSkipStrategyTest.java
new file mode 100644
index 00000000..de8d16e8
--- /dev/null
+++ 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinWithBlockIdRangeSkipStrategyTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.shuffle.RssSparkConfig;
+
+import org.apache.uniffle.common.BlockSkipStrategy;
+
+public class AQESkewedJoinWithBlockIdRangeSkipStrategyTest extends 
AQESkewedJoinTest {
+
+  @Override
+  public void updateSparkConfCustomer(SparkConf sparkConf) {
+    super.updateSparkConfCustomer(sparkConf);
+    sparkConf.set(RssSparkConfig.RSS_CLIENT_READ_BLOCK_SKIP_STRATEGY.key(), 
BlockSkipStrategy.BLOCKID_RANGE.name());
+  }
+}
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 74fe2767..7f1b4d41 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -619,6 +619,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
         .setLastBlockId(request.getLastBlockId())
         .setReadBufferSize(request.getReadBufferSize())
         .setSerializedExpectedTaskIdsBitmap(serializedTaskIdsBytes)
+        .addAllExpectedBlockIdRange(request.getExpectedBlockIdRange())
         .setTimestamp(start)
         .build();
 
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetInMemoryShuffleDataRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetInMemoryShuffleDataRequest.java
index 87c3d2f1..b3ff0d82 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetInMemoryShuffleDataRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssGetInMemoryShuffleDataRequest.java
@@ -17,6 +17,8 @@
 
 package org.apache.uniffle.client.request;
 
+import java.util.List;
+
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 public class RssGetInMemoryShuffleDataRequest {
@@ -27,15 +29,18 @@ public class RssGetInMemoryShuffleDataRequest {
   private final int readBufferSize;
   private final Roaring64NavigableMap expectedTaskIds;
 
+  private List<Long> expectedBlockIdRange;
+
   public RssGetInMemoryShuffleDataRequest(
       String appId, int shuffleId, int partitionId, long lastBlockId, int 
readBufferSize,
-      Roaring64NavigableMap expectedTaskIds) {
+      Roaring64NavigableMap expectedTaskIds, List<Long> expectedBlockIdRange) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.lastBlockId = lastBlockId;
     this.readBufferSize = readBufferSize;
     this.expectedTaskIds = expectedTaskIds;
+    this.expectedBlockIdRange = expectedBlockIdRange;
   }
 
   public String getAppId() {
@@ -61,4 +66,8 @@ public class RssGetInMemoryShuffleDataRequest {
   public Roaring64NavigableMap getExpectedTaskIds() {
     return expectedTaskIds;
   }
+
+  public List<Long> getExpectedBlockIdRange() {
+    return expectedBlockIdRange;
+  }
 }
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index db3c6a07..e2c081a1 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -93,6 +93,7 @@ message GetMemoryShuffleDataRequest {
   int32 readBufferSize = 5;
   int64 timestamp = 6;
   optional bytes serializedExpectedTaskIdsBitmap = 7;
+  repeated int64 expectedBlockIdRange = 8;
 }
 
 message GetMemoryShuffleDataResponse {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 3de560ce..fd100b2e 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -662,6 +662,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
               request.getSerializedExpectedTaskIdsBitmap().toByteArray()
           );
         }
+        List<Long> expectedBlockIdRange = 
request.getExpectedBlockIdRangeList();
         ShuffleDataResult shuffleDataResult = shuffleServer
             .getShuffleTaskManager()
             .getInMemoryShuffleData(
@@ -670,7 +671,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
                 partitionId,
                 blockId,
                 readBufferSize,
-                expectedTaskIds
+                expectedTaskIds,
+                expectedBlockIdRange
             );
         byte[] data = new byte[]{};
         List<BufferSegment> bufferSegments = Lists.newArrayList();
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 9b3c64e2..71371ae8 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -364,9 +364,9 @@ public class ShuffleTaskManager {
 
   public ShuffleDataResult getInMemoryShuffleData(
       String appId, Integer shuffleId, Integer partitionId, long blockId, int 
readBufferSize,
-      Roaring64NavigableMap expectedTaskIds) {
+      Roaring64NavigableMap expectedTaskIds, List<Long> expectedBlockIdRange) {
     return shuffleBufferManager.getShuffleData(appId,
-        shuffleId, partitionId, blockId, readBufferSize, expectedTaskIds);
+        shuffleId, partitionId, blockId, readBufferSize, expectedTaskIds, 
expectedBlockIdRange);
   }
 
   public ShuffleDataResult getShuffleData(
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index 874bcaae..9b0a50a2 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -25,6 +25,7 @@ import java.util.function.Supplier;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.commons.collections.CollectionUtils;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +36,7 @@ import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.ShufflePartitionedData;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleFlushManager;
 
@@ -149,16 +151,22 @@ public class ShuffleBuffer {
     return getShuffleData(lastBlockId, readBufferSize, null);
   }
 
+  public synchronized ShuffleDataResult getShuffleData(
+      long lastBlockId, int readBufferSize, Roaring64NavigableMap 
expectedTaskIds) {
+    return getShuffleData(lastBlockId, readBufferSize, expectedTaskIds, null);
+  }
+
   // 1. generate buffer segments and other info: if blockId exist, start with 
which eventId
   // 2. according to info from step 1, generate data
   // todo: if block was flushed, it's possible to get duplicated data
   public synchronized ShuffleDataResult getShuffleData(
-      long lastBlockId, int readBufferSize, Roaring64NavigableMap 
expectedTaskIds) {
+      long lastBlockId, int readBufferSize, Roaring64NavigableMap 
expectedTaskIds,
+      List<Long> expectedBlockIdRange) {
     try {
       List<BufferSegment> bufferSegments = Lists.newArrayList();
       List<ShufflePartitionedBlock> readBlocks = Lists.newArrayList();
       updateBufferSegmentsAndResultBlocks(
-          lastBlockId, readBufferSize, bufferSegments, readBlocks, 
expectedTaskIds);
+          lastBlockId, readBufferSize, bufferSegments, readBlocks, 
expectedTaskIds, expectedBlockIdRange);
       if (!bufferSegments.isEmpty()) {
         int length = calculateDataLength(bufferSegments);
         byte[] data = new byte[length];
@@ -180,7 +188,8 @@ public class ShuffleBuffer {
       long readBufferSize,
       List<BufferSegment> bufferSegments,
       List<ShufflePartitionedBlock> resultBlocks,
-      Roaring64NavigableMap expectedTaskIds) {
+      Roaring64NavigableMap expectedTaskIds,
+      List<Long> expectedBlockIdRange) {
     long nextBlockId = lastBlockId;
     List<Long> sortedEventId = sortFlushingEventId();
     int offset = 0;
@@ -194,11 +203,11 @@ public class ShuffleBuffer {
         // update bufferSegments with different strategy according to 
lastBlockId
         if (nextBlockId == Constants.INVALID_BLOCK_ID) {
           updateSegmentsWithoutBlockId(offset, inFlushBlockMap.get(eventId), 
readBufferSize,
-              bufferSegments, resultBlocks, expectedTaskIds);
+              bufferSegments, resultBlocks, expectedTaskIds, 
expectedBlockIdRange);
           hasLastBlockId = true;
         } else {
           hasLastBlockId = updateSegmentsWithBlockId(offset, 
inFlushBlockMap.get(eventId),
-              readBufferSize, nextBlockId, bufferSegments, resultBlocks, 
expectedTaskIds);
+              readBufferSize, nextBlockId, bufferSegments, resultBlocks, 
expectedTaskIds, expectedBlockIdRange);
           // if last blockId is found, read from begin with next cached blocks
           if (hasLastBlockId) {
             // reset blockId to read from begin in next cached blocks
@@ -216,11 +225,12 @@ public class ShuffleBuffer {
     // try to read from cached blocks which is not in flush queue
     if (blocks.size() > 0 && offset < readBufferSize) {
       if (nextBlockId == Constants.INVALID_BLOCK_ID) {
-        updateSegmentsWithoutBlockId(offset, blocks, readBufferSize, 
bufferSegments, resultBlocks, expectedTaskIds);
+        updateSegmentsWithoutBlockId(offset, blocks, readBufferSize, 
bufferSegments,
+            resultBlocks, expectedTaskIds, expectedBlockIdRange);
         hasLastBlockId = true;
       } else {
         hasLastBlockId = updateSegmentsWithBlockId(offset, blocks,
-            readBufferSize, nextBlockId, bufferSegments, resultBlocks, 
expectedTaskIds);
+            readBufferSize, nextBlockId, bufferSegments, resultBlocks, 
expectedTaskIds, expectedBlockIdRange);
       }
     }
     if ((!inFlushBlockMap.isEmpty() || blocks.size() > 0) && offset == 0 && 
!hasLastBlockId) {
@@ -228,7 +238,8 @@ public class ShuffleBuffer {
       // but there still has data in memory
       // try read again with blockId = Constants.INVALID_BLOCK_ID
       updateBufferSegmentsAndResultBlocks(
-          Constants.INVALID_BLOCK_ID, readBufferSize, bufferSegments, 
resultBlocks, expectedTaskIds);
+          Constants.INVALID_BLOCK_ID, readBufferSize, bufferSegments,
+          resultBlocks, expectedTaskIds, expectedBlockIdRange);
     }
   }
 
@@ -270,13 +281,19 @@ public class ShuffleBuffer {
       long readBufferSize,
       List<BufferSegment> bufferSegments,
       List<ShufflePartitionedBlock> readBlocks,
-      Roaring64NavigableMap expectedTaskIds) {
+      Roaring64NavigableMap expectedTaskIds,
+      List<Long> expectedBlockIdRange) {
     int currentOffset = offset;
     // read from first block
     for (ShufflePartitionedBlock block : cachedBlocks) {
       if (expectedTaskIds != null && 
!expectedTaskIds.contains(block.getTaskAttemptId())) {
         continue;
       }
+
+      if (CollectionUtils.isNotEmpty(expectedBlockIdRange)
+          && !RssUtils.checkIfBlockInRange(expectedBlockIdRange, 
block.getBlockId())) {
+        continue;
+      }
       // add bufferSegment with block
       bufferSegments.add(new BufferSegment(block.getBlockId(), currentOffset, 
block.getLength(),
           block.getUncompressLength(), block.getCrc(), 
block.getTaskAttemptId()));
@@ -297,7 +314,8 @@ public class ShuffleBuffer {
       long lastBlockId,
       List<BufferSegment> bufferSegments,
       List<ShufflePartitionedBlock> readBlocks,
-      Roaring64NavigableMap expectedTaskIds) {
+      Roaring64NavigableMap expectedTaskIds,
+      List<Long> expectedBlockIdRange) {
     int currentOffset = offset;
     // find lastBlockId, then read from next block
     boolean foundBlockId = false;
@@ -312,6 +330,10 @@ public class ShuffleBuffer {
       if (expectedTaskIds != null && 
!expectedTaskIds.contains(block.getTaskAttemptId())) {
         continue;
       }
+      if (CollectionUtils.isNotEmpty(expectedBlockIdRange)
+          && !RssUtils.checkIfBlockInRange(expectedBlockIdRange, 
block.getBlockId())) {
+        continue;
+      }
       // add bufferSegment with block
       bufferSegments.add(new BufferSegment(block.getBlockId(), currentOffset, 
block.getLength(),
           block.getUncompressLength(), block.getCrc(), 
block.getTaskAttemptId()));
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index b606d30c..ddad74b3 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -158,13 +158,15 @@ public class ShuffleBufferManager {
         partitionId,
         blockId,
         readBufferSize,
+        null,
         null
     );
   }
 
   public ShuffleDataResult getShuffleData(
       String appId, int shuffleId, int partitionId, long blockId,
-      int readBufferSize, Roaring64NavigableMap expectedTaskIds) {
+      int readBufferSize, Roaring64NavigableMap expectedTaskIds,
+      List<Long> expectedBlockIdRange) {
     Map.Entry<Range<Integer>, ShuffleBuffer> entry = getShuffleBufferEntry(
         appId, shuffleId, partitionId);
     if (entry == null) {
@@ -175,7 +177,7 @@ public class ShuffleBufferManager {
     if (buffer == null) {
       return null;
     }
-    return buffer.getShuffleData(blockId, readBufferSize, expectedTaskIds);
+    return buffer.getShuffleData(blockId, readBufferSize, expectedTaskIds, 
expectedBlockIdRange);
   }
 
   void flushSingleBufferIfNecessary(ShuffleBuffer buffer, String appId,
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 1e6d056d..60f39c33 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.RangeMap;
 import com.google.common.io.Files;
 import org.junit.jupiter.api.BeforeEach;
@@ -29,6 +30,7 @@ import org.junit.jupiter.api.Test;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.ShufflePartitionedData;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.server.ShuffleFlushManager;
@@ -122,7 +124,8 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
         0,
         Constants.INVALID_BLOCK_ID,
         60,
-        Roaring64NavigableMap.bitmapOf(1)
+        Roaring64NavigableMap.bitmapOf(1),
+        null
     );
     assertEquals(1, result.getBufferSegments().size());
     assertEquals(0, result.getBufferSegments().get(0).getOffset());
@@ -136,7 +139,8 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
         0,
         lastBlockId,
         60,
-        Roaring64NavigableMap.bitmapOf(1)
+        Roaring64NavigableMap.bitmapOf(1),
+        null
     );
     assertEquals(1, result.getBufferSegments().size());
     assertEquals(0, result.getBufferSegments().get(0).getOffset());
@@ -164,13 +168,22 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     assertEquals(100, bufferPool.get(appId).get(1).get(0).getSize());
     assertEquals(200, bufferPool.get(appId).get(2).get(0).getSize());
     assertEquals(100, bufferPool.get(appId).get(3).get(0).getSize());
+    Roaring64NavigableMap processedBlockIds = Roaring64NavigableMap.bitmapOf();
+    Roaring64NavigableMap exceptTaskIds = Roaring64NavigableMap.bitmapOf();
+    Lists.newArrayList(spd1, spd2, spd3, spd4).forEach((spd) -> {
+      for (ShufflePartitionedBlock shufflePartitionedBlock : 
spd.getBlockList()) {
+        exceptTaskIds.add(shufflePartitionedBlock.getTaskAttemptId());
+      }
+    });
+
     // validate get shuffle data
     ShuffleDataResult sdr = shuffleBufferManager.getShuffleData(
-        appId, 2, 0, Constants.INVALID_BLOCK_ID, 60);
+        appId, 2, 0, Constants.INVALID_BLOCK_ID, 60, exceptTaskIds, null);
     assertArrayEquals(spd2.getBlockList()[0].getData(), sdr.getData());
     long lastBlockId = spd2.getBlockList()[0].getBlockId();
+    processedBlockIds = Roaring64NavigableMap.bitmapOf();
     sdr = shuffleBufferManager.getShuffleData(
-        appId, 2, 0, lastBlockId, 100);
+        appId, 2, 0, lastBlockId, 100, exceptTaskIds, null);
     assertArrayEquals(spd3.getBlockList()[0].getData(), sdr.getData());
     // flush happen
     ShufflePartitionedData spd5 = createData(0, 10);
@@ -185,12 +198,14 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     // keep buffer whose size < low water mark
     assertEquals(1, bufferPool.get(appId).get(4).get(0).getBlocks().size());
     // data in flush buffer now, it also can be got before flush finish
+    processedBlockIds = Roaring64NavigableMap.bitmapOf();
     sdr = shuffleBufferManager.getShuffleData(
-        appId, 2, 0, Constants.INVALID_BLOCK_ID, 60);
+        appId, 2, 0, Constants.INVALID_BLOCK_ID, 60, exceptTaskIds, null);
     assertArrayEquals(spd2.getBlockList()[0].getData(), sdr.getData());
     lastBlockId = spd2.getBlockList()[0].getBlockId();
+    processedBlockIds = Roaring64NavigableMap.bitmapOf();
     sdr = shuffleBufferManager.getShuffleData(
-        appId, 2, 0, lastBlockId, 100);
+        appId, 2, 0, lastBlockId, 100, exceptTaskIds, null);
     assertArrayEquals(spd3.getBlockList()[0].getData(), sdr.getData());
     // cache data again, it should cause flush
     spd1 = createData(0, 10);
@@ -201,12 +216,14 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     bufferPool.get(appId).get(2).get(0).getInFlushBlockMap().clear();
     bufferPool.get(appId).get(3).get(0).getInFlushBlockMap().clear();
     // empty data return
+    processedBlockIds = Roaring64NavigableMap.bitmapOf();
     sdr = shuffleBufferManager.getShuffleData(
-        appId, 2, 0, Constants.INVALID_BLOCK_ID, 60);
+        appId, 2, 0, Constants.INVALID_BLOCK_ID, 60, exceptTaskIds, null);
     assertEquals(0, sdr.getData().length);
     lastBlockId = spd2.getBlockList()[0].getBlockId();
+    processedBlockIds = Roaring64NavigableMap.bitmapOf();
     sdr = shuffleBufferManager.getShuffleData(
-        appId, 2, 0, lastBlockId, 100);
+        appId, 2, 0, lastBlockId, 100, exceptTaskIds, null);
     assertEquals(0, sdr.getData().length);
   }
 
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java
index a45f1a55..a8513cff 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferTest.java
@@ -83,6 +83,17 @@ public class ShuffleBufferTest extends BufferTestBase {
     assertEquals(0, shuffleBuffer.getBlocks().size());
   }
 
+  private ShuffleDataResult getShuffleData(ShuffleBuffer shuffleBuffer, long 
lastBlockId,
+                                           int readBufferSize, 
ShufflePartitionedData... spds) {
+    Roaring64NavigableMap exceptTaskIds = Roaring64NavigableMap.bitmapOf();
+    for (ShufflePartitionedData spd : spds) {
+      for (ShufflePartitionedBlock shufflePartitionedBlock : 
spd.getBlockList()) {
+        exceptTaskIds.add(shufflePartitionedBlock.getTaskAttemptId());
+      }
+    }
+    return shuffleBuffer.getShuffleData(lastBlockId, readBufferSize, 
exceptTaskIds);
+  }
+
   @Test
   public void getShuffleDataWithExpectedTaskIdsFilterTest() {
     /**
@@ -255,7 +266,7 @@ public class ShuffleBufferTest extends BufferTestBase {
     shuffleBuffer.append(spd1);
     shuffleBuffer.append(spd2);
     byte[] expectedData = getExpectedData(spd1, spd2);
-    ShuffleDataResult sdr = 
shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 40);
+    ShuffleDataResult sdr = getShuffleData(shuffleBuffer, 
Constants.INVALID_BLOCK_ID, 40, spd1, spd2);
     compareBufferSegment(shuffleBuffer.getBlocks(), sdr.getBufferSegments(), 
0, 2);
     assertArrayEquals(expectedData, sdr.getData());
 
@@ -266,7 +277,7 @@ public class ShuffleBufferTest extends BufferTestBase {
     shuffleBuffer.append(spd1);
     shuffleBuffer.append(spd2);
     expectedData = getExpectedData(spd1, spd2);
-    sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 40);
+    sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 40, spd1, 
spd2);
     compareBufferSegment(shuffleBuffer.getBlocks(), sdr.getBufferSegments(), 
0, 2);
     assertArrayEquals(expectedData, sdr.getData());
 
@@ -277,7 +288,7 @@ public class ShuffleBufferTest extends BufferTestBase {
     shuffleBuffer.append(spd1);
     shuffleBuffer.append(spd2);
     expectedData = getExpectedData(spd1, spd2);
-    sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 40);
+    sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 40, spd1, 
spd2);
     compareBufferSegment(shuffleBuffer.getBlocks(), sdr.getBufferSegments(), 
0, 2);
     assertArrayEquals(expectedData, sdr.getData());
 
@@ -290,13 +301,13 @@ public class ShuffleBufferTest extends BufferTestBase {
     shuffleBuffer.append(spd2);
     shuffleBuffer.append(spd3);
     expectedData = getExpectedData(spd1, spd2);
-    sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 25);
+    sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 25, spd1, 
spd2);
     compareBufferSegment(shuffleBuffer.getBlocks(), sdr.getBufferSegments(), 
0, 2);
     assertArrayEquals(expectedData, sdr.getData());
 
     // case4: cached data only, blockId != -1 && exist, readBufferSize < 
buffer size
     long lastBlockId = spd2.getBlockList()[0].getBlockId();
-    sdr = shuffleBuffer.getShuffleData(lastBlockId, 25);
+    sdr = getShuffleData(shuffleBuffer, lastBlockId, 25, spd3);
     expectedData = getExpectedData(spd3);
     compareBufferSegment(shuffleBuffer.getBlocks(), sdr.getBufferSegments(), 
2, 1);
     assertArrayEquals(expectedData, sdr.getData());
@@ -309,19 +320,19 @@ public class ShuffleBufferTest extends BufferTestBase {
     shuffleBuffer.append(spd2);
     ShuffleDataFlushEvent event1 = shuffleBuffer.toFlushEvent("appId", 0, 0, 
1, null);
     assertEquals(0, shuffleBuffer.getBlocks().size());
-    sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 20);
+    sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 20, spd1, 
spd2);
     
compareBufferSegment(shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()),
         sdr.getBufferSegments(), 0, 2);
     expectedData = getExpectedData(spd1, spd2);
     assertArrayEquals(expectedData, sdr.getData());
 
     // case5: flush data only, blockId = lastBlockId
-    sdr = shuffleBuffer.getShuffleData(spd2.getBlockList()[0].getBlockId(), 
20);
+    sdr = getShuffleData(shuffleBuffer, spd2.getBlockList()[0].getBlockId(), 
20, spd1, spd2);
     assertEquals(0, sdr.getBufferSegments().size());
 
     // case6: no data in buffer & flush buffer
     shuffleBuffer = new ShuffleBuffer(200);
-    sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 10);
+    sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 10, spd1, 
spd2);
     assertEquals(0, sdr.getBufferSegments().size());
     assertEquals(0, sdr.getData().length);
 
@@ -374,7 +385,7 @@ public class ShuffleBufferTest extends BufferTestBase {
     List<ShufflePartitionedBlock> expectedBlocks = Lists.newArrayList(
         shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
     expectedData = getExpectedData(spd1);
-    sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 10);
+    sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 10, spd1);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 0, 1);
     assertArrayEquals(expectedData, sdr.getData());
@@ -383,7 +394,7 @@ public class ShuffleBufferTest extends BufferTestBase {
     expectedBlocks = Lists.newArrayList(
         shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
     expectedData = getExpectedData(spd2);
-    sdr = shuffleBuffer.getShuffleData(spd1.getBlockList()[0].getBlockId(), 
10);
+    sdr = getShuffleData(shuffleBuffer, spd1.getBlockList()[0].getBlockId(), 
10, spd2);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 1, 1);
     assertArrayEquals(expectedData, sdr.getData());
@@ -392,7 +403,7 @@ public class ShuffleBufferTest extends BufferTestBase {
     expectedBlocks = Lists.newArrayList(
         shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
     expectedData = getExpectedData(spd1, spd2);
-    sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 20);
+    sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 20, spd1, 
spd2);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 0, 2);
     assertArrayEquals(expectedData, sdr.getData());
@@ -401,7 +412,7 @@ public class ShuffleBufferTest extends BufferTestBase {
     expectedBlocks = Lists.newArrayList(
         shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
     expectedData = getExpectedData(spd2, spd3);
-    sdr = shuffleBuffer.getShuffleData(spd1.getBlockList()[0].getBlockId(), 
20);
+    sdr = getShuffleData(shuffleBuffer, spd1.getBlockList()[0].getBlockId(), 
20, spd2, spd3);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 1, 2);
     assertArrayEquals(expectedData, sdr.getData());
@@ -411,7 +422,7 @@ public class ShuffleBufferTest extends BufferTestBase {
         shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
     
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
     expectedData = getExpectedData(spd1, spd2, spd3, spd4);
-    sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 50);
+    sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 50, spd1, 
spd2, spd3, spd4);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 0, 4);
     assertArrayEquals(expectedData, sdr.getData());
@@ -421,7 +432,7 @@ public class ShuffleBufferTest extends BufferTestBase {
         shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
     
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
     expectedData = getExpectedData(spd2, spd3);
-    sdr = shuffleBuffer.getShuffleData(spd1.getBlockList()[0].getBlockId(), 
20);
+    sdr = getShuffleData(shuffleBuffer, spd1.getBlockList()[0].getBlockId(), 
20, spd2, spd3);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 1, 2);
     assertArrayEquals(expectedData, sdr.getData());
@@ -430,7 +441,7 @@ public class ShuffleBufferTest extends BufferTestBase {
     expectedBlocks = Lists.newArrayList(
         shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
     expectedData = getExpectedData(spd4);
-    sdr = shuffleBuffer.getShuffleData(spd3.getBlockList()[0].getBlockId(), 
10);
+    sdr = getShuffleData(shuffleBuffer, spd3.getBlockList()[0].getBlockId(), 
10, spd4);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 0, 1);
     assertArrayEquals(expectedData, sdr.getData());
@@ -439,7 +450,7 @@ public class ShuffleBufferTest extends BufferTestBase {
     expectedBlocks = Lists.newArrayList(
         shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
     expectedData = getExpectedData(spd6);
-    sdr = shuffleBuffer.getShuffleData(spd5.getBlockList()[0].getBlockId(), 
10);
+    sdr = getShuffleData(shuffleBuffer, spd5.getBlockList()[0].getBlockId(), 
10, spd6);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 2, 1);
     assertArrayEquals(expectedData, sdr.getData());
@@ -448,7 +459,7 @@ public class ShuffleBufferTest extends BufferTestBase {
     expectedBlocks = Lists.newArrayList(
         shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
     expectedData = getExpectedData(spd4, spd5, spd6);
-    sdr = shuffleBuffer.getShuffleData(spd3.getBlockList()[0].getBlockId(), 
40);
+    sdr = getShuffleData(shuffleBuffer, spd3.getBlockList()[0].getBlockId(), 
40, spd4, spd5, spd6);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 0, 3);
     assertArrayEquals(expectedData, sdr.getData());
@@ -459,7 +470,7 @@ public class ShuffleBufferTest extends BufferTestBase {
     
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
     
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
     expectedData = getExpectedData(spd3, spd4, spd5, spd6, spd7);
-    sdr = shuffleBuffer.getShuffleData(spd2.getBlockList()[0].getBlockId(), 
70);
+    sdr = getShuffleData(shuffleBuffer, spd2.getBlockList()[0].getBlockId(), 
70, spd3, spd4, spd5, spd6, spd7);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 2, 5);
     assertArrayEquals(expectedData, sdr.getData());
@@ -469,7 +480,7 @@ public class ShuffleBufferTest extends BufferTestBase {
         shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
     
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
     expectedData = getExpectedData(spd6, spd7);
-    sdr = shuffleBuffer.getShuffleData(spd5.getBlockList()[0].getBlockId(), 
20);
+    sdr = getShuffleData(shuffleBuffer, spd5.getBlockList()[0].getBlockId(), 
20, spd6, spd7);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 2, 2);
     assertArrayEquals(expectedData, sdr.getData());
@@ -479,7 +490,7 @@ public class ShuffleBufferTest extends BufferTestBase {
         shuffleBuffer.getInFlushBlockMap().get(event2.getEventId()));
     
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
     expectedData = getExpectedData(spd6, spd7, spd8, spd9);
-    sdr = shuffleBuffer.getShuffleData(spd5.getBlockList()[0].getBlockId(), 
50);
+    sdr = getShuffleData(shuffleBuffer, spd5.getBlockList()[0].getBlockId(), 
50, spd6, spd7, spd8, spd9);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 2, 4);
     assertArrayEquals(expectedData, sdr.getData());
@@ -489,7 +500,7 @@ public class ShuffleBufferTest extends BufferTestBase {
         shuffleBuffer.getInFlushBlockMap().get(event3.getEventId()));
     
expectedBlocks.addAll(shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
     expectedData = getExpectedData(spd9, spd10);
-    sdr = shuffleBuffer.getShuffleData(spd8.getBlockList()[0].getBlockId(), 
20);
+    sdr = getShuffleData(shuffleBuffer, spd8.getBlockList()[0].getBlockId(), 
20, spd9, spd10);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 2, 2);
     assertArrayEquals(expectedData, sdr.getData());
@@ -498,7 +509,7 @@ public class ShuffleBufferTest extends BufferTestBase {
     expectedBlocks = Lists.newArrayList(
         shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
     expectedData = getExpectedData(spd10);
-    sdr = shuffleBuffer.getShuffleData(spd9.getBlockList()[0].getBlockId(), 
10);
+    sdr = getShuffleData(shuffleBuffer, spd9.getBlockList()[0].getBlockId(), 
10, spd10);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 0, 1);
     assertArrayEquals(expectedData, sdr.getData());
@@ -507,7 +518,7 @@ public class ShuffleBufferTest extends BufferTestBase {
     expectedBlocks = Lists.newArrayList(
         shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
     expectedData = getExpectedData(spd12);
-    sdr = shuffleBuffer.getShuffleData(spd11.getBlockList()[0].getBlockId(), 
10);
+    sdr = getShuffleData(shuffleBuffer, spd11.getBlockList()[0].getBlockId(), 
10, spd12);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 2, 1);
     assertArrayEquals(expectedData, sdr.getData());
@@ -516,7 +527,7 @@ public class ShuffleBufferTest extends BufferTestBase {
     expectedBlocks = Lists.newArrayList(
         shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
     expectedData = getExpectedData(spd10, spd11, spd12);
-    sdr = shuffleBuffer.getShuffleData(spd9.getBlockList()[0].getBlockId(), 
40);
+    sdr = getShuffleData(shuffleBuffer, spd9.getBlockList()[0].getBlockId(), 
40, spd10, spd11, spd12);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 0, 3);
     assertArrayEquals(expectedData, sdr.getData());
@@ -526,7 +537,7 @@ public class ShuffleBufferTest extends BufferTestBase {
         shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
     expectedBlocks.addAll(shuffleBuffer.getBlocks());
     expectedData = getExpectedData(spd12, spd13);
-    sdr = shuffleBuffer.getShuffleData(spd11.getBlockList()[0].getBlockId(), 
20);
+    sdr = getShuffleData(shuffleBuffer, spd11.getBlockList()[0].getBlockId(), 
20, spd12, spd13);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 2, 2);
     assertArrayEquals(expectedData, sdr.getData());
@@ -535,7 +546,7 @@ public class ShuffleBufferTest extends BufferTestBase {
     expectedBlocks = Lists.newArrayList(shuffleBuffer.getBlocks());
     expectedBlocks.addAll(shuffleBuffer.getBlocks());
     expectedData = getExpectedData(spd13);
-    sdr = shuffleBuffer.getShuffleData(spd12.getBlockList()[0].getBlockId(), 
10);
+    sdr = getShuffleData(shuffleBuffer, spd12.getBlockList()[0].getBlockId(), 
10, spd13);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 0, 1);
     assertArrayEquals(expectedData, sdr.getData());
@@ -544,7 +555,7 @@ public class ShuffleBufferTest extends BufferTestBase {
     expectedBlocks = Lists.newArrayList(shuffleBuffer.getBlocks());
     expectedBlocks.addAll(shuffleBuffer.getBlocks());
     expectedData = getExpectedData(spd14, spd15);
-    sdr = shuffleBuffer.getShuffleData(spd13.getBlockList()[0].getBlockId(), 
20);
+    sdr = getShuffleData(shuffleBuffer, spd13.getBlockList()[0].getBlockId(), 
20, spd14, spd15);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 1, 2);
     assertArrayEquals(expectedData, sdr.getData());
@@ -553,7 +564,7 @@ public class ShuffleBufferTest extends BufferTestBase {
     expectedBlocks = Lists.newArrayList(shuffleBuffer.getBlocks());
     expectedBlocks.addAll(shuffleBuffer.getBlocks());
     expectedData = getExpectedData(spd15);
-    sdr = shuffleBuffer.getShuffleData(spd14.getBlockList()[0].getBlockId(), 
10);
+    sdr = getShuffleData(shuffleBuffer, spd14.getBlockList()[0].getBlockId(), 
10, spd15);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 2, 1);
     assertArrayEquals(expectedData, sdr.getData());
@@ -563,7 +574,7 @@ public class ShuffleBufferTest extends BufferTestBase {
         shuffleBuffer.getInFlushBlockMap().get(event4.getEventId()));
     expectedBlocks.addAll(shuffleBuffer.getBlocks());
     expectedData = getExpectedData(spd12, spd13, spd14, spd15);
-    sdr = shuffleBuffer.getShuffleData(spd11.getBlockList()[0].getBlockId(), 
50);
+    sdr = getShuffleData(shuffleBuffer, spd11.getBlockList()[0].getBlockId(), 
50, spd12, spd13, spd14, spd15);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 2, 4);
     assertArrayEquals(expectedData, sdr.getData());
@@ -577,20 +588,24 @@ public class ShuffleBufferTest extends BufferTestBase {
     expectedBlocks.addAll(shuffleBuffer.getBlocks());
     expectedData = getExpectedData(spd1, spd2, spd3, spd4, spd5, spd6, spd7, 
spd8, spd9,
         spd10, spd11, spd12, spd13, spd14, spd15);
-    sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 220);
+    sdr = getShuffleData(shuffleBuffer, Constants.INVALID_BLOCK_ID, 220,
+        spd1, spd2, spd3, spd4, spd5, spd6, spd7, spd8, spd9,
+        spd10, spd11, spd12, spd13, spd14, spd15);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 0, 15);
     assertArrayEquals(expectedData, sdr.getData());
 
     // case7 after get spd15
-    sdr = shuffleBuffer.getShuffleData(spd15.getBlockList()[0].getBlockId(), 
20);
+    sdr = getShuffleData(shuffleBuffer, spd15.getBlockList()[0].getBlockId(), 
20,
+        spd1, spd2, spd3, spd4, spd5, spd6, spd7, spd8, spd9,
+        spd10, spd11, spd12, spd13, spd14, spd15);
     assertEquals(0, sdr.getBufferSegments().size());
 
     // case7 can't find blockId, read from start
     expectedBlocks = Lists.newArrayList(
         shuffleBuffer.getInFlushBlockMap().get(event1.getEventId()));
     expectedData = getExpectedData(spd1, spd2);
-    sdr = shuffleBuffer.getShuffleData(-200, 20);
+    sdr = getShuffleData(shuffleBuffer, -200, 20, spd1, spd2);
     compareBufferSegment(expectedBlocks,
         sdr.getBufferSegments(), 0, 2);
     assertArrayEquals(expectedData, sdr.getData());
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
 
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index e8fc4046..fddd1650 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -123,7 +123,10 @@ public class ShuffleHandlerFactory {
         request.getReadBufferSize(),
         shuffleServerClient,
         request.getExpectTaskIds(),
-        request.isExpectedTaskIdsBitmapFilterEnable()
+        request.getExpectBlockIds(),
+        request.getProcessBlockIds(),
+        request.getBlockSkipStrategy(),
+        request.getMaxBlockIdRangeSegments()
     );
     return memoryClientReadHandler;
   }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
index 3cc2e6ba..8c052023 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.storage.handler.impl;
 import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,18 +28,25 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.client.api.ShuffleServerClient;
 import org.apache.uniffle.client.request.RssGetInMemoryShuffleDataRequest;
 import org.apache.uniffle.client.response.RssGetInMemoryShuffleDataResponse;
+import org.apache.uniffle.common.BlockSkipStrategy;
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RssUtils;
 
 public class MemoryClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MemoryClientReadHandler.class);
+  private final Roaring64NavigableMap expectBlockIds;
+  private final Roaring64NavigableMap processBlockIds;
+  private final BlockSkipStrategy blockSkipStrategy;
+  private final int maxBlockIdRangeSegments;
   private long lastBlockId = Constants.INVALID_BLOCK_ID;
   private ShuffleServerClient shuffleServerClient;
   private Roaring64NavigableMap expectTaskIds;
   private boolean expectedTaskIdsBitmapFilterEnable;
+  private List<Long> expectedBlockIdRange = Lists.newArrayList();
 
   // Only for tests
   @VisibleForTesting
@@ -55,8 +63,10 @@ public class MemoryClientReadHandler extends 
AbstractClientReadHandler {
         readBufferSize,
         shuffleServerClient,
         null,
-        false
-    );
+        null,
+        null,
+        BlockSkipStrategy.TASK_BITMAP,
+        0);
   }
 
   public MemoryClientReadHandler(
@@ -66,18 +76,32 @@ public class MemoryClientReadHandler extends 
AbstractClientReadHandler {
       int readBufferSize,
       ShuffleServerClient shuffleServerClient,
       Roaring64NavigableMap expectTaskIds,
-      boolean expectedTaskIdsBitmapFilterEnable) {
+      Roaring64NavigableMap expectBlockIds,
+      Roaring64NavigableMap processBlockIds,
+      BlockSkipStrategy blockSkipStrategy,
+      int maxBlockIdRangeSegments) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionId = partitionId;
     this.readBufferSize = readBufferSize;
     this.shuffleServerClient = shuffleServerClient;
     this.expectTaskIds = expectTaskIds;
-    this.expectedTaskIdsBitmapFilterEnable = expectedTaskIdsBitmapFilterEnable;
+    this.expectBlockIds = expectBlockIds;
+    this.processBlockIds = processBlockIds;
+    this.blockSkipStrategy = blockSkipStrategy;
+    this.maxBlockIdRangeSegments = maxBlockIdRangeSegments;
   }
 
   @Override
   public ShuffleDataResult readShuffleData() {
+    if (BlockSkipStrategy.BLOCKID_RANGE.equals(blockSkipStrategy) && 
lastBlockId == Constants.INVALID_BLOCK_ID) {
+      Roaring64NavigableMap bitmap = RssUtils.cloneBitMap(expectBlockIds);
+      bitmap.xor(processBlockIds);
+      expectedBlockIdRange = RssUtils.generateRangeSegments(bitmap, 
maxBlockIdRangeSegments);
+      if (expectedBlockIdRange.size() == 0) {
+        return null;
+      }
+    }
     ShuffleDataResult result = null;
 
     RssGetInMemoryShuffleDataRequest request = new 
RssGetInMemoryShuffleDataRequest(
@@ -86,7 +110,8 @@ public class MemoryClientReadHandler extends 
AbstractClientReadHandler {
         partitionId,
         lastBlockId,
         readBufferSize,
-        expectedTaskIdsBitmapFilterEnable ? expectTaskIds : null
+        expectedTaskIdsBitmapFilterEnable ? expectTaskIds : null,
+        expectedBlockIdRange
     );
 
     try {
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
 
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
index 58729485..07b2143f 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleReadHandlerRequest.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
+import org.apache.uniffle.common.BlockSkipStrategy;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.config.RssBaseConf;
@@ -44,7 +45,8 @@ public class CreateShuffleReadHandlerRequest {
   private Roaring64NavigableMap processBlockIds;
   private ShuffleDataDistributionType distributionType;
   private Roaring64NavigableMap expectTaskIds;
-  private boolean expectedTaskIdsBitmapFilterEnable;
+  private BlockSkipStrategy blockSkipStrategy;
+  private int maxBlockIdRangeSegments;
 
   public CreateShuffleReadHandlerRequest() {
   }
@@ -177,11 +179,19 @@ public class CreateShuffleReadHandlerRequest {
     this.expectTaskIds = expectTaskIds;
   }
 
-  public boolean isExpectedTaskIdsBitmapFilterEnable() {
-    return expectedTaskIdsBitmapFilterEnable;
+  public BlockSkipStrategy getBlockSkipStrategy() {
+    return blockSkipStrategy;
   }
 
-  public void useExpectedTaskIdsBitmapFilter() {
-    this.expectedTaskIdsBitmapFilterEnable = true;
+  public void setBlockSkipStrategy(BlockSkipStrategy blockSkipStrategy) {
+    this.blockSkipStrategy = blockSkipStrategy;
+  }
+
+  public int getMaxBlockIdRangeSegments() {
+    return maxBlockIdRangeSegments;
+  }
+
+  public void setMaxBlockIdRangeSegments(int maxBlockIdRangeSegments) {
+    this.maxBlockIdRangeSegments = maxBlockIdRangeSegments;
   }
 }

Reply via email to