[
https://issues.apache.org/jira/browse/HADOOP-15327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17631643#comment-17631643
]
ASF GitHub Bot commented on HADOOP-15327:
-----------------------------------------
szilard-nemeth commented on code in PR #3259:
URL: https://github.com/apache/hadoop/pull/3259#discussion_r1019075009
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -904,65 +990,84 @@ private List<String> splitMaps(List<String> mapq) {
}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+ public void channelActive(ChannelHandlerContext ctx)
throws Exception {
- super.channelOpen(ctx, evt);
-
- if ((maxShuffleConnections > 0) && (accepted.size() >=
maxShuffleConnections)) {
+ NettyChannelHelper.channelActive(ctx.channel());
+ int numConnections = activeConnections.incrementAndGet();
+ if ((maxShuffleConnections > 0) && (numConnections >
maxShuffleConnections)) {
LOG.info(String.format("Current number of shuffle connections (%d) is
" +
- "greater than or equal to the max allowed shuffle connections
(%d)",
+ "greater than the max allowed shuffle connections (%d)",
accepted.size(), maxShuffleConnections));
- Map<String, String> headers = new HashMap<String, String>(1);
+ Map<String, String> headers = new HashMap<>(1);
// notify fetchers to backoff for a while before closing the connection
// if the shuffle connection limit is hit. Fetchers are expected to
// handle this notification gracefully, that is, not treating this as a
// fetch failure.
headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
- return;
+ } else {
+ super.channelActive(ctx);
+ accepted.add(ctx.channel());
+ LOG.debug("Added channel: {}, channel id: {}. Accepted number of
connections={}",
+ ctx.channel(), ctx.channel().id(), activeConnections.get());
}
- accepted.add(evt.getChannel());
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ NettyChannelHelper.channelInactive(ctx.channel());
+ super.channelInactive(ctx);
+ int noOfConnections = activeConnections.decrementAndGet();
+ LOG.debug("New value of Accepted number of connections={}",
noOfConnections);
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
- HttpRequest request = (HttpRequest) evt.getMessage();
- if (request.getMethod() != GET) {
- sendError(ctx, METHOD_NOT_ALLOWED);
- return;
+ Channel channel = ctx.channel();
+ LOG.trace("Executing channelRead, channel id: {}", channel.id());
+ HttpRequest request = (HttpRequest) msg;
+ LOG.debug("Received HTTP request: {}, channel id: {}", request,
channel.id());
+ if (request.method() != GET) {
+ sendError(ctx, METHOD_NOT_ALLOWED);
+ return;
}
// Check whether the shuffle version is compatible
- if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
- request.headers() != null ?
- request.headers().get(ShuffleHeader.HTTP_HEADER_NAME) : null)
- || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
- request.headers() != null ?
- request.headers()
- .get(ShuffleHeader.HTTP_HEADER_VERSION) : null)) {
+ String shuffleVersion = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION;
+ String httpHeaderName = ShuffleHeader.HTTP_HEADER_NAME;
Review Comment:
Good catch, fixed. please check the new code.
I agree with @brumi1024 , the request.headers null check is needed.
Double-checked the original code:
It sent error with "Incompatible shuffle request version" message either if:
1. no request.headers() was present (equals to null)
2. shuffleversion is not equal to ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION
3. header name is not equal to ShuffleHeader.HTTP_HEADER_NAME
> Upgrade MR ShuffleHandler to use Netty4
> ---------------------------------------
>
> Key: HADOOP-15327
> URL: https://issues.apache.org/jira/browse/HADOOP-15327
> Project: Hadoop Common
> Issue Type: Sub-task
> Reporter: Xiaoyu Yao
> Assignee: Szilard Nemeth
> Priority: Major
> Labels: pull-request-available
> Attachments: HADOOP-15327.001.patch, HADOOP-15327.002.patch,
> HADOOP-15327.003.patch, HADOOP-15327.004.patch, HADOOP-15327.005.patch,
> HADOOP-15327.005.patch,
> getMapOutputInfo_BlockingOperationException_awaitUninterruptibly.log,
> hades-results-20221108.zip, testfailure-testMapFileAccess-emptyresponse.zip,
> testfailure-testReduceFromPartialMem.zip
>
> Time Spent: 11.5h
> Remaining Estimate: 0h
>
> This way, we can remove the dependencies on the netty3 (jboss.netty)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]