This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d030caf  MINOR: Use enum for close mode in Selector instead of two 
booleans (#4559)
d030caf is described below

commit d030caf9561e306556f5e0ab323bb382e967138d
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Tue Feb 13 10:19:33 2018 -0800

    MINOR: Use enum for close mode in Selector instead of two booleans (#4559)
---
 .../org/apache/kafka/common/network/Selector.java  | 45 +++++++++++++---------
 1 file changed, 26 insertions(+), 19 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index ed037b3..1b2d1a2 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -87,6 +87,18 @@ public class Selector implements Selectable, AutoCloseable {
 
     public static final long NO_IDLE_TIMEOUT_MS = -1;
 
+    private enum CloseMode {
+        GRACEFUL(true),            // process outstanding staged receives, 
notify disconnect
+        NOTIFY_ONLY(true),         // discard any outstanding receives, notify 
disconnect
+        DISCARD_NO_NOTIFY(false);  // discard any outstanding receives, no 
disconnect notification
+
+        boolean notifyDisconnect;
+
+        CloseMode(boolean notifyDisconnect) {
+            this.notifyDisconnect = notifyDisconnect;
+        }
+    }
+
     private final Logger log;
     private final java.nio.channels.Selector nioSelector;
     private final Map<String, KafkaChannel> channels;
@@ -327,7 +339,7 @@ public class Selector implements Selectable, AutoCloseable {
                 channel.state(ChannelState.FAILED_SEND);
                 // ensure notification via `disconnected` when `failedSends` 
are processed in the next poll
                 this.failedSends.add(connectionId);
-                close(channel, false, false);
+                close(channel, CloseMode.DISCARD_NO_NOTIFY);
                 if (!(e instanceof CancelledKeyException)) {
                     log.error("Unexpected exception during send, closing 
connection {} and rethrowing exception {}",
                             connectionId, e);
@@ -507,7 +519,7 @@ public class Selector implements Selectable, AutoCloseable {
 
                 /* cancel any defunct sockets */
                 if (!key.isValid())
-                    close(channel, true, true);
+                    close(channel, CloseMode.GRACEFUL);
 
             } catch (Exception e) {
                 String desc = channel.socketDescription();
@@ -517,7 +529,7 @@ public class Selector implements Selectable, AutoCloseable {
                     log.debug("Connection with {} disconnected due to 
authentication exception", desc, e);
                 else
                     log.warn("Unexpected error from {}; closing connection", 
desc, e);
-                close(channel, !sendFailed, true);
+                close(channel, sendFailed ? CloseMode.NOTIFY_ONLY : 
CloseMode.GRACEFUL);
             } finally {
                 maybeRecordTimePerConnection(channel, channelStartTimeNanos);
             }
@@ -627,7 +639,7 @@ public class Selector implements Selectable, AutoCloseable {
                     log.trace("About to close the idle connection from {} due 
to being idle for {} millis",
                             connectionId, (currentTimeNanos - 
expiredConnection.getValue()) / 1000 / 1000);
                 channel.state(ChannelState.EXPIRED);
-                close(channel, true, true);
+                close(channel, CloseMode.GRACEFUL);
             }
         }
     }
@@ -681,7 +693,7 @@ public class Selector implements Selectable, AutoCloseable {
             // There is no disconnect notification for local close, but 
updating
             // channel state here anyway to avoid confusion.
             channel.state(ChannelState.LOCAL_CLOSE);
-            close(channel, false, false);
+            close(channel, CloseMode.DISCARD_NO_NOTIFY);
         } else {
             KafkaChannel closingChannel = this.closingChannels.remove(id);
             // Close any closing channel, leave the channel in the state in 
which closing was triggered
@@ -692,20 +704,15 @@ public class Selector implements Selectable, 
AutoCloseable {
 
     /**
      * Begin closing this connection.
+     * If 'closeMode' is `CloseMode.GRACEFUL`, the channel is disconnected 
here, but staged receives
+     * are processed. The channel is closed when there are no outstanding 
receives or if a send is
+     * requested. For other values of `closeMode`, outstanding receives are 
discarded and the channel
+     * is closed immediately.
      *
-     * If 'processOutstanding' is true, the channel is disconnected here, but 
staged receives are
-     * processed. The channel is closed when there are no outstanding receives 
or if a send
-     * is requested. The channel will be added to disconnect list when it is 
actually closed.
-     *
-     * If 'processOutstanding' is false, outstanding receives are discarded 
and the channel is
-     * closed immediately. The channel will not be added to disconnected list 
and it is the
-     * responsibility of the caller to handle disconnect notifications.
+     * The channel will be added to disconnect list when it is actually closed 
if `closeMode.notifyDisconnect`
+     * is true.
      */
-    private void close(KafkaChannel channel, boolean processOutstanding, 
boolean notifyDisconnect) {
-
-        if (processOutstanding && !notifyDisconnect)
-            throw new IllegalStateException("Disconnect notification required 
for remote disconnect after processing outstanding requests");
-
+    private void close(KafkaChannel channel, CloseMode closeMode) {
         channel.disconnect();
 
         // Ensure that `connected` does not have closed channels. This could 
happen if `prepare` throws an exception
@@ -719,12 +726,12 @@ public class Selector implements Selectable, 
AutoCloseable {
         // a send fails or all outstanding receives are processed. Mute state 
of disconnected channels
         // are tracked to ensure that requests are processed one-by-one by the 
broker to preserve ordering.
         Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
-        if (processOutstanding && deque != null && !deque.isEmpty()) {
+        if (closeMode == CloseMode.GRACEFUL && deque != null && 
!deque.isEmpty()) {
             // stagedReceives will be moved to completedReceives later along 
with receives from other channels
             closingChannels.put(channel.id(), channel);
             log.debug("Tracking closing connection {} to process outstanding 
requests", channel.id());
         } else
-            doClose(channel, notifyDisconnect);
+            doClose(channel, closeMode.notifyDisconnect);
         this.channels.remove(channel.id());
 
         if (idleExpiryManager != null)

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.

Reply via email to