Repository: kudu
Updated Branches:
  refs/heads/master f24307db4 -> 72895966c


java: avoid spewing ClosedChannelException on client-initiated disconnects

When submitting an MR job, the submitting program opens a client
instance for a short amount of time, just to fetch an authentication
token. When it then calls 'close()', any TabletClient connections that
the client may still have open get disconnected, and that was causing an
exception to be logged like follows:

17/03/07 13:44:51 ERROR client.TabletClient: [Peer ] Unexpected exception from 
downstream on [id: 0x06a6a87b, /172.31.112.110:41188 :> 
kudu-security-1.gce.cloudera.com/172.31.112.110:7051]
java.nio.channels.ClosedChannelException
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:433)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromUserCode(AbstractNioWorker.java:128)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:84)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.write(Channels.java:725)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.write(Channels.java:704)
        at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.write(Channels.java:671)
        at 
org.apache.kudu.client.Negotiator.sendSaslMessage(Negotiator.java:218)
        at 
org.apache.kudu.client.Negotiator.sendTunneledTls(Negotiator.java:515)
        at 
org.apache.kudu.client.Negotiator.sendPendingOutboundTls(Negotiator.java:505)
        at 
org.apache.kudu.client.Negotiator.handleTlsMessage(Negotiator.java:451)
        at org.apache.kudu.client.Negotiator.handleResponse(Negotiator.java:250)
        at 
org.apache.kudu.client.Negotiator.messageReceived(Negotiator.java:229)

This patch adds a flag in TabletClient which keeps track of the fact that
the disconnection was requested rather than unexpected, and in that case
avoids logging anything.

A new test triggers the same connection, captures logs, and makes sure
that no exceptions are in the logs.

Change-Id: I4e940d821c7d3f670c5a6b7407385952dc9debfc
Reviewed-on: http://gerrit.cloudera.org:8080/6303
Reviewed-by: Jean-Daniel Cryans <[email protected]>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5566bc90
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5566bc90
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5566bc90

Branch: refs/heads/master
Commit: 5566bc902ef94a9ce825f7c2f939cf87428941df
Parents: f24307d
Author: Todd Lipcon <[email protected]>
Authored: Tue Mar 7 15:26:10 2017 -0800
Committer: Todd Lipcon <[email protected]>
Committed: Wed Mar 8 00:34:29 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/TabletClient.java    | 13 ++++
 .../org/apache/kudu/client/TestKuduClient.java  | 25 +++++++
 .../apache/kudu/util/CapturingLogAppender.java  | 79 ++++++++++++++++++++
 3 files changed, 117 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5566bc90/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index 4d44406..e26e883 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -26,6 +26,7 @@
 
 package org.apache.kudu.client;
 
+import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -144,6 +145,13 @@ public class TabletClient extends 
SimpleChannelUpstreamHandler {
 
   private final ServerInfo serverInfo;
 
+  /**
+   * Set to true when the client initiates a disconnect. The 
channelDisconnected
+   * event handler then knows not to log any warning about unexpected 
disconnection
+   * from the peer.
+   */
+  private volatile boolean closedByClient;
+
   public TabletClient(AsyncKuduClient client, ServerInfo serverInfo) {
     this.kuduClient = client;
     this.socketReadTimeoutMs = client.getDefaultSocketReadTimeoutMs();
@@ -293,6 +301,7 @@ public class TabletClient extends 
SimpleChannelUpstreamHandler {
     // added to a ChannelPipeline, which synchronously fires the channelOpen()
     // event.
     Preconditions.checkNotNull(chan);
+    closedByClient = true;
     return Channels.disconnect(chan);
   }
 
@@ -689,6 +698,10 @@ public class TabletClient extends 
SimpleChannelUpstreamHandler {
           " ignore this if we're shutting down", e);
     } else if (e instanceof ReadTimeoutException) {
       LOG.debug(getPeerUuidLoggingString() + "Encountered a read timeout, will 
close the channel");
+    } else if (e instanceof ClosedChannelException) {
+      if (!closedByClient) {
+        LOG.info(getPeerUuidLoggingString() + "Lost connection to peer");
+      }
     } else {
       LOG.error(getPeerUuidLoggingString() + "Unexpected exception from 
downstream on " + c, e);
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/5566bc90/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 488d7ea..6d3be3e 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -24,6 +24,7 @@ import static 
org.apache.kudu.client.RowResult.timestampToString;
 import static org.junit.Assert.*;
 import static org.junit.matchers.JUnitMatchers.containsString;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -34,12 +35,15 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
+import org.apache.kudu.util.CapturingLogAppender;
+import org.apache.log4j.AppenderSkeleton;
 
 public class TestKuduClient extends BaseKuduTest {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestKuduClient.class);
@@ -786,6 +790,25 @@ public class TestKuduClient extends BaseKuduTest {
     assertEquals(1, countRowsInScan(scanner));
   }
 
+  /**
+   * Regression test for some log spew which occurred in short-lived client 
instances which
+   * had outbound connections.
+   */
+  @Test(timeout = 100000)
+  public void testCloseShortlyAfterOpen() throws Exception {
+    CapturingLogAppender cla = new CapturingLogAppender();
+    try (Closeable c = cla.attach()) {
+      try (KuduClient localClient = new 
KuduClient.KuduClientBuilder(masterAddresses).build()) {
+        // Force the client to connect to the masters.
+        localClient.exportAuthenticationCredentials();
+      }
+      // Wait a little bit since the "channel disconnected" exceptions could 
come
+      // from threads that don't get synchronously joined by client.close().
+      Thread.sleep(500);
+    }
+    assertFalse(cla.getAppendedText(), 
cla.getAppendedText().contains("Exception"));
+  }
+
   @Test(timeout = 100000)
   public void testCustomNioExecutor() throws Exception {
     long startTime = System.nanoTime();
@@ -829,4 +852,6 @@ public class TestKuduClient extends BaseKuduTest {
   public void testNoDefaultPartitioning() throws Exception {
     syncClient.createTable(tableName, basicSchema, new CreateTableOptions());
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/5566bc90/java/kudu-client/src/test/java/org/apache/kudu/util/CapturingLogAppender.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/util/CapturingLogAppender.java 
b/java/kudu-client/src/test/java/org/apache/kudu/util/CapturingLogAppender.java
new file mode 100644
index 0000000..3d2d5b6
--- /dev/null
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/util/CapturingLogAppender.java
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
+import org.apache.log4j.spi.LoggingEvent;
+
+import com.google.common.base.Throwables;
+
+/**
+ * Test utility which wraps Log4j and captures all messages logged
+ * while it is attached. This can be useful for asserting that a particular
+ * message is (or is not) logged.
+ */
+public class CapturingLogAppender extends AppenderSkeleton {
+  private StringBuilder appended = new StringBuilder();
+  private static final Layout layout = new SimpleLayout();
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public boolean requiresLayout() {
+    return false;
+  }
+
+  @Override
+  protected void append(LoggingEvent event) {
+    appended.append(layout.format(event));
+    if (event.getThrowableInformation() != null) {
+      appended.append(Throwables.getStackTraceAsString(
+          event.getThrowableInformation().getThrowable())).append("\n");
+    }
+  }
+
+  public String getAppendedText() {
+    return appended.toString();
+  }
+
+  /**
+   * Temporarily attach the capturing appender to the Log4j root logger.
+   * This can be used in a 'try-with-resources' block:
+   * <code>
+   *   try (Closeable c = capturer.attach()) {
+   *     ...
+   *   }
+   * </code>
+   */
+  public Closeable attach() {
+    Logger.getRootLogger().addAppender(this);
+    return new Closeable() {
+      @Override
+      public void close() throws IOException {
+        Logger.getRootLogger().removeAppender(CapturingLogAppender.this);
+      }
+    };
+  }
+}

Reply via email to