SteNicholas commented on code in PR #2820:
URL: https://github.com/apache/celeborn/pull/2820#discussion_r1816393296
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java:
##########
@@ -158,11 +171,55 @@ private void addCredit(MapPartitionData mapPartitionData,
int numCredit, long st
}
}
+ private void notifyRequiredSegment(
+ MapPartitionData mapPartitionData, int requiredSegmentId, long streamId,
int subPartitionId) {
+ logger.debug(
+ "Receive RequiredSegment from client, streamId: {}, requiredSegmentId:
{}, subPartitionId: {}",
+ streamId,
+ requiredSegmentId,
+ subPartitionId);
+ try {
+ if (mapPartitionData != null) {
+ checkState(mapPartitionData instanceof SegmentMapPartitionData);
+ ((SegmentMapPartitionData) mapPartitionData)
+ .notifyRequiredSegmentId(requiredSegmentId, streamId,
subPartitionId);
+ }
+ } catch (Throwable e) {
+ logger.error(
+ String.format("Fail to notify segmentId %s for stream %s.",
requiredSegmentId, streamId),
+ e);
+ throw e;
+ }
+ }
+
public void addCredit(int numCredit, long streamId) {
+ if (!streams.containsKey(streamId)) {
+ // In flink hybrid shuffle integration strategy, the stream may release
in worker before
+ // client receive bufferStreamEnd,
+ // and the client may send request with old streamId, so ignore
non-exist streams.
+ logger.warn("Ignore AddCredit from stream {}, numCredit {}.", streamId,
numCredit);
+ return;
+ }
MapPartitionData mapPartitionData =
streams.get(streamId).getMapDataPartition();
addCredit(mapPartitionData, numCredit, streamId);
}
+ public void notifyRequiredSegment(int requiredSegmentId, long streamId, int
subPartitionId) {
+ if (!streams.containsKey(streamId)) {
+ // In flink hybrid shuffle integration strategy, the stream may release
in worker before
+ // client receive bufferStreamEnd,
+ // and the client may send request with old streamId, so ignore
non-exist streams.
+ logger.warn(
+ "Ignore RequiredSegment from stream {}, subPartition {}, segmentId
{}.",
+ streamId,
+ subPartitionId,
+ requiredSegmentId);
+ return;
+ }
+ MapPartitionData mapPartitionData =
streams.get(streamId).getMapDataPartition();
Review Comment:
This should check whether `streams.get(streamId)` is null, otherwise there
may be NPE.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]