This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 8d0b4cf4c [CELEBORN-1506][BUG] Revert "[CELEBORN-1036][FOLLOWUP]
totalInflightReqs should decrement when batchIdSet contains the batchId to
avoid duplicate caller of removeBatch"
8d0b4cf4c is described below
commit 8d0b4cf4cdee77ded00e7a981e978463d27c34a4
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Sat Jul 13 13:09:46 2024 +0800
[CELEBORN-1506][BUG] Revert "[CELEBORN-1036][FOLLOWUP] totalInflightReqs
should decrement when batchIdSet contains the batchId to avoid duplicate caller
of removeBatch"
### What changes were proposed in this pull request?
One of our users reported a dataloss issue in
https://github.com/apache/celeborn/pull/2612 , I tried to reproduce
the bug with the following setup:
1. Partition data is far larger than
`spark.celeborn.client.shuffle.partitionSplit.threshold`, which means split
happens very often
2. `spark.celeborn.client.shuffle.partitionSplit.threshold` is larger than
`celeborn.worker.shuffle.partitionSplit.max`, which means when split happens,
it is `HARD_SPLIT`
3. `celeborn.client.shuffle.batchHandleChangePartition.enabled` is true,
which means when hard split happens, `LifecycleManager` will commit the splits
before the stage finishes
Configs in spark side:
```
spark.celeborn.client.push.maxReqsInFlight.perWorker | 256
spark.celeborn.client.push.maxReqsInFlight.total | 2048
spark.celeborn.client.shuffle.batchHandleCommitPartition.enabled | true
spark.celeborn.client.shuffle.compression.codec | zstd
spark.celeborn.client.shuffle.partitionSplit.threshold | 48m
spark.celeborn.client.spark.fetch.throwsFetchFailure | true
spark.celeborn.client.spark.push.sort.memory.adaptiveThreshold | true
spark.celeborn.client.spark.push.sort.memory.threshold | 512m
spark.celeborn.client.spark.shuffle.writer | sort
spark.celeborn.master.endpoints | master-1-1:9097
```
Configs in celeborn side:
```
celeborn.metrics.enabled=false
celeborn.replicate.io.numConnectionsPerPeer=24
celeborn.application.heartbeat.timeout=120s
celeborn.worker.storage.dirs=/mnt/disk1,/mnt/disk2
celeborn.network.timeout=2000s
celeborn.ha.enabled=false
celeborn.worker.closeIdleConnections=true
celeborn.worker.monitor.disk.enabled=false
celeborn.worker.flusher.threads=16
celeborn.worker.graceful.shutdown.enabled=true
celeborn.worker.rpc.port=9100
celeborn.worker.push.port=9101
celeborn.worker.fetch.port=9102
celeborn.worker.replicate.port=9103
celeborn.worker.shuffle.partitionSplit.max=10m // this is made to be small
```
My query on 10T TPCDS:
```
select
max(ss_sold_time_sk ),
max(ss_item_sk ),
max(ss_customer_sk ),
max(ss_cdemo_sk ),
max(ss_hdemo_sk ),
max(ss_addr_sk ),
max(ss_store_sk ),
max(ss_promo_sk ),
max(ss_ticket_number ),
max(ss_quantity ),
max(ss_wholesale_cost ),
max(ss_list_price ),
max(ss_sales_price ),
max(ss_ext_discount_amt ),
max(ss_ext_sales_price ),
max(ss_ext_wholesale_cost),
max(ss_ext_list_price ),
max(ss_ext_tax ),
max(ss_coupon_amt ),
max(ss_net_paid ),
max(ss_net_paid_inc_tax ),
max(ss_net_profit ),
max(ss_sold_date_sk )
from (
select * from store_sales where ss_sold_date_sk is not null distribute by
ss_sold_date_sk
) a;
```
After digging into it, I found the bug is introduced by
https://github.com/apache/celeborn/pull/2134 . #2134 added
check in `InFlightRequestTracker#addBatch` and
`InFlightRequestTracker#removeBatch` and only
increments/decrements `totalInflightReqs` when `batchIdSet` contains
current `batchId`, which conflicts with
`ShuffleClientImpl#PushDataRpcResponseCallback#updateLatestPartition`,
which calls `addBatch` first then calls
`removeBatch` with the same batchId. As a result, the call to `addBatch`
fails to increment `totalInflightReqs`, but
the call to `removeBatch` decrements `totalInflightReqs`, which means the
retried push is not counted, then later
`limitZeroInFlight` in `mapperEnd` will return even though the retried push
fails.
This PR fixes the bug by reverting #2134
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual test.
Closes #2621 from waitinfuture/1506.
Authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../celeborn/common/write/InFlightRequestTracker.java | 18 ++++++------------
1 file changed, 6 insertions(+), 12 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
b/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
index 6e11aa501..cab0a3624 100644
---
a/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
+++
b/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
@@ -58,28 +58,22 @@ public class InFlightRequestTracker {
}
public void addBatch(int batchId, String hostAndPushPort) {
- Set<Integer> batchIdSet =
+ Set<Integer> batchIdSetPerPair =
inflightBatchesPerAddress.computeIfAbsent(
hostAndPushPort, id -> ConcurrentHashMap.newKeySet());
- if (batchIdSet.add(batchId)) {
- totalInflightReqs.increment();
- } else {
- logger.debug("{} has already been inflight.", batchId);
- }
+ batchIdSetPerPair.add(batchId);
+ totalInflightReqs.increment();
}
public void removeBatch(int batchId, String hostAndPushPort) {
Set<Integer> batchIdSet = inflightBatchesPerAddress.get(hostAndPushPort);
// TODO: Need to debug why batchIdSet will be null.
if (batchIdSet != null) {
- if (batchIdSet.remove(batchId)) {
- totalInflightReqs.decrement();
- } else {
- logger.debug("BatchIdSet has removed {}.", batchId);
- }
+ batchIdSet.remove(batchId);
} else {
logger.warn("BatchIdSet of {} is null.", hostAndPushPort);
}
+ totalInflightReqs.decrement();
}
public void onSuccess(String hostAndPushPort) {
@@ -103,7 +97,7 @@ public class InFlightRequestTracker {
pushStrategy.limitPushSpeed(pushState, hostAndPushPort);
int currentMaxReqsInFlight =
pushStrategy.getCurrentMaxReqsInFlight(hostAndPushPort);
- Set<Integer> batchIdSet = getBatchIdSetByAddressPair(hostAndPushPort);
+ Set batchIdSet = getBatchIdSetByAddressPair(hostAndPushPort);
long times = waitInflightTimeoutMs / delta;
try {
while (times > 0) {