To be more detailed, ShuffleClient.pushData is a synchronous API, data will
be sent to wire before
return, but callbacks are asynchronously handled in netty's thread pool.

If the callback says that the push data should be retried (for example,
HARD_SPLIT, or push failed), the callback
thread first checks whether the shuffle id that the data belongs to is
already ended, if not, the callback thread
will send Revive to LifecycleManager, who will prepare new workers for the
partition id.

After that, the callback thread will submit a retry push task to
ShuffleClientImpl's pushDataRetryPool,
who will re-push the failed data to the new worker.

So, IMO, it's normal behavior that the driver receives lots of Revive
requests. But if we add the shuffle id in
ShuffleClientImpl's stageEndShuffleSet, then ShuffleClient should not send
the requests any more.

Thanks,
Keyong Zhou

Keyong Zhou <[email protected]> 于2023年8月1日周二 11:40写道:

> Hi Sungwoo,
>
> Thanks for your letter and apologize for the late reply :P
>
> For your questions:
>
> 1. No we can't, the thread pool is stopped by calling
> ShuffleClient.shutdown(). In addition,
> The thread pool is shared for all shuffle ids, if you shut it down, the
> ShuffleClient will not
> work properly for other shuffles.
>
> 2. You can try to add the shuffle id in
> ShuffleClientImpl's stageEndShuffleSet. Currently
> ShuffleClient does not have an API like `setStageEnd`, but I think it's
> fine to add one. Let me know if you are interested in sending a PR :)
>
> BTW, if you are using v0.3.0-incubating, I recommend you to patch the
> following PR:
> https://github.com/apache/incubator-celeborn/pull/1755
> It's related to StageEnd logic.
>
> Thanks,
> Keyong Zhou
>
>
>
> <[email protected]> 于2023年7月31日周一 10:54写道:
>
>> Hi Celeborn team,
>>
>> We are implementing a Celeborn-MR3 client, and have a question on how to
>> properly unregister a shuffle ID via ShuffleClient. Here is a description
>> of the problem.
>>
>> 1. Suppose that several ShuffleClients are pushing data for a common
>> shuffle ID.
>>
>> 2. For some reason (e.g., a Hive query fails due to OutOfMemoryError or
>> some task fails after several attempts), we decide to interrupt all
>> ShuffleClients.
>>
>> 3. Inside the driver, we call ShuffleClient.unregisterShuffle() with
>> isDriver set to true. Insider MR3 workers, we call
>> ShuffleClient.unregisterShuffle()
>> with isDriver set to false, as well as ShuffleClient.cleanup().
>>
>> Outcome:
>>
>> Insider workers, data push threads continue to run. As a result, the
>> driver
>> keeps receiving revive requests due to HARD_SPLIT.
>>
>> Question:
>>
>> 1. Can we stop data push threads (e.g., celeborn-retry-sender-6) when we
>> call ShuffleClient.unregisterShuffle()?
>>
>> 2. What is a correct way of stopping ShuffleClient for a given shuffle
>> ID?
>> In our experiment, the driver prints thousands of revive request, and we
>> are not sure if this is a normal behavior.
>>
>> Any comment or suggestion will be appreciated very much. Thank you.
>>
>> --- Sungwoo
>>
>>

Reply via email to