This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new d030caf MINOR: Use enum for close mode in Selector instead of two booleans (#4559) d030caf is described below commit d030caf9561e306556f5e0ab323bb382e967138d Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Tue Feb 13 10:19:33 2018 -0800 MINOR: Use enum for close mode in Selector instead of two booleans (#4559) --- .../org/apache/kafka/common/network/Selector.java | 45 +++++++++++++--------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index ed037b3..1b2d1a2 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -87,6 +87,18 @@ public class Selector implements Selectable, AutoCloseable { public static final long NO_IDLE_TIMEOUT_MS = -1; + private enum CloseMode { + GRACEFUL(true), // process outstanding staged receives, notify disconnect + NOTIFY_ONLY(true), // discard any outstanding receives, notify disconnect + DISCARD_NO_NOTIFY(false); // discard any outstanding receives, no disconnect notification + + boolean notifyDisconnect; + + CloseMode(boolean notifyDisconnect) { + this.notifyDisconnect = notifyDisconnect; + } + } + private final Logger log; private final java.nio.channels.Selector nioSelector; private final Map<String, KafkaChannel> channels; @@ -327,7 +339,7 @@ public class Selector implements Selectable, AutoCloseable { channel.state(ChannelState.FAILED_SEND); // ensure notification via `disconnected` when `failedSends` are processed in the next poll this.failedSends.add(connectionId); - close(channel, false, false); + close(channel, CloseMode.DISCARD_NO_NOTIFY); if (!(e instanceof CancelledKeyException)) { log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}", connectionId, e); @@ -507,7 +519,7 @@ public class Selector implements Selectable, AutoCloseable { /* cancel any defunct sockets */ if (!key.isValid()) - close(channel, true, true); + close(channel, CloseMode.GRACEFUL); } catch (Exception e) { String desc = channel.socketDescription(); @@ -517,7 +529,7 @@ public class Selector implements Selectable, AutoCloseable { log.debug("Connection with {} disconnected due to authentication exception", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); - close(channel, !sendFailed, true); + close(channel, sendFailed ? CloseMode.NOTIFY_ONLY : CloseMode.GRACEFUL); } finally { maybeRecordTimePerConnection(channel, channelStartTimeNanos); } @@ -627,7 +639,7 @@ public class Selector implements Selectable, AutoCloseable { log.trace("About to close the idle connection from {} due to being idle for {} millis", connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000); channel.state(ChannelState.EXPIRED); - close(channel, true, true); + close(channel, CloseMode.GRACEFUL); } } } @@ -681,7 +693,7 @@ public class Selector implements Selectable, AutoCloseable { // There is no disconnect notification for local close, but updating // channel state here anyway to avoid confusion. channel.state(ChannelState.LOCAL_CLOSE); - close(channel, false, false); + close(channel, CloseMode.DISCARD_NO_NOTIFY); } else { KafkaChannel closingChannel = this.closingChannels.remove(id); // Close any closing channel, leave the channel in the state in which closing was triggered @@ -692,20 +704,15 @@ public class Selector implements Selectable, AutoCloseable { /** * Begin closing this connection. + * If 'closeMode' is `CloseMode.GRACEFUL`, the channel is disconnected here, but staged receives + * are processed. The channel is closed when there are no outstanding receives or if a send is + * requested. For other values of `closeMode`, outstanding receives are discarded and the channel + * is closed immediately. * - * If 'processOutstanding' is true, the channel is disconnected here, but staged receives are - * processed. The channel is closed when there are no outstanding receives or if a send - * is requested. The channel will be added to disconnect list when it is actually closed. - * - * If 'processOutstanding' is false, outstanding receives are discarded and the channel is - * closed immediately. The channel will not be added to disconnected list and it is the - * responsibility of the caller to handle disconnect notifications. + * The channel will be added to disconnect list when it is actually closed if `closeMode.notifyDisconnect` + * is true. */ - private void close(KafkaChannel channel, boolean processOutstanding, boolean notifyDisconnect) { - - if (processOutstanding && !notifyDisconnect) - throw new IllegalStateException("Disconnect notification required for remote disconnect after processing outstanding requests"); - + private void close(KafkaChannel channel, CloseMode closeMode) { channel.disconnect(); // Ensure that `connected` does not have closed channels. This could happen if `prepare` throws an exception @@ -719,12 +726,12 @@ public class Selector implements Selectable, AutoCloseable { // a send fails or all outstanding receives are processed. Mute state of disconnected channels // are tracked to ensure that requests are processed one-by-one by the broker to preserve ordering. Deque<NetworkReceive> deque = this.stagedReceives.get(channel); - if (processOutstanding && deque != null && !deque.isEmpty()) { + if (closeMode == CloseMode.GRACEFUL && deque != null && !deque.isEmpty()) { // stagedReceives will be moved to completedReceives later along with receives from other channels closingChannels.put(channel.id(), channel); log.debug("Tracking closing connection {} to process outstanding requests", channel.id()); } else - doClose(channel, notifyDisconnect); + doClose(channel, closeMode.notifyDisconnect); this.channels.remove(channel.id()); if (idleExpiryManager != null) -- To stop receiving notification emails like this one, please contact j...@apache.org.