This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 57dda5d KUDU-1438: [java] Upgrade to Netty 4
57dda5d is described below
commit 57dda5d4868d29f68de4aa0ac516ca390333e6be
Author: Grant Henke <[email protected]>
AuthorDate: Wed Jan 22 14:24:03 2020 -0600
KUDU-1438: [java] Upgrade to Netty 4
This patch upgrades from Netty 3 to the latest Netty 4.
Below is a list of some of the changes required:
- Changed org.jboss.netty imports to io.netty
- Adjust shading to relocate io.netty
- Removed outdated reflection in Bytes.java
- Changed ChannelBuffer to ByteBuffer
- Changed OneToOneDecoder to BytesToMessageDecoder
- Changed OneToOneEncoder to MessageToByteEncoder
- Changed DecoderEmbedder to EmbeddedChannel
- Changed Negotiator and Connection to extend
SimpleChannelInboundHandler instead of
SimpleChannelUpstreamHandler
- Worked around non-sharable SslHandler
- Migrated from NioClientSocketChannelFactory to Netty Boostrap
- Removed boss threads and deprecated setter methods in the
AsyncKuduClient
- Removed ShutdownThread in AsyncKuduClient
- Updated Negotiator tests to unwrap SSL messages
- Added build flag to run tests with paranoid level leak detection
- Suppressed errorprone FutureReturnValueIgnored warnings
I used YCSB to performance test this change and ensure there
isn’t a large performance regression. On an m4.2xlarge instance
I ran a test of 5 iterations of workload a, b, and c using 1M records
and 1M operations. The difference between runs was negligible
(< 1%) and within test variance.
Change-Id: Ic75bd15a982187039ff2e1510af9390d304f7626
Reviewed-on: http://gerrit.cloudera.org:8080/15136
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <[email protected]>
---
java/gradle/dependencies.gradle | 4 +-
java/gradle/shadow.gradle | 2 +-
java/gradle/tests.gradle | 5 +
.../org/apache/kudu/client/AlterTableRequest.java | 2 +-
.../org/apache/kudu/client/AsyncKuduClient.java | 116 ++++++-----
.../org/apache/kudu/client/AsyncKuduSession.java | 4 +-
.../main/java/org/apache/kudu/client/Batch.java | 2 +-
.../main/java/org/apache/kudu/client/Bytes.java | 93 +--------
.../java/org/apache/kudu/client/CallResponse.java | 40 ++--
.../org/apache/kudu/client/ConnectToCluster.java | 2 +-
.../apache/kudu/client/ConnectToMasterRequest.java | 2 +-
.../java/org/apache/kudu/client/Connection.java | 198 +++++++++----------
.../org/apache/kudu/client/ConnectionCache.java | 19 +-
.../org/apache/kudu/client/CreateTableRequest.java | 2 +-
.../org/apache/kudu/client/DeleteTableRequest.java | 2 +-
.../kudu/client/GetTableLocationsRequest.java | 2 +-
.../apache/kudu/client/GetTableSchemaRequest.java | 2 +-
.../kudu/client/GetTableStatisticsRequest.java | 2 +-
.../kudu/client/IsAlterTableDoneRequest.java | 2 +-
.../kudu/client/IsCreateTableDoneRequest.java | 2 +-
.../java/org/apache/kudu/client/KuduClient.java | 36 +++-
.../main/java/org/apache/kudu/client/KuduRpc.java | 38 ++--
.../org/apache/kudu/client/ListTablesRequest.java | 2 +-
.../kudu/client/ListTabletServersRequest.java | 2 +-
.../org/apache/kudu/client/ListTabletsRequest.java | 2 +-
.../java/org/apache/kudu/client/Negotiator.java | 218 ++++++++++++---------
.../java/org/apache/kudu/client/Operation.java | 2 +-
.../java/org/apache/kudu/client/PingRequest.java | 2 +-
.../org/apache/kudu/client/RpcOutboundMessage.java | 24 +--
.../apache/kudu/client/SplitKeyRangeRequest.java | 2 +-
.../main/java/org/apache/kudu/util/NetUtil.java | 1 -
.../test/java/org/apache/kudu/client/ITClient.java | 1 +
.../apache/kudu/client/ITScannerMultiTablet.java | 1 +
.../apache/kudu/client/TestAsyncKuduClient.java | 1 +
.../apache/kudu/client/TestAuthTokenReacquire.java | 3 +-
.../kudu/client/TestAuthnTokenReacquireOpen.java | 1 +
.../apache/kudu/client/TestConnectToCluster.java | 4 +-
.../apache/kudu/client/TestConnectionCache.java | 1 +
.../org/apache/kudu/client/TestNegotiator.java | 142 +++++++++-----
39 files changed, 501 insertions(+), 485 deletions(-)
diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle
index 67b42a7..3eaf428 100755
--- a/java/gradle/dependencies.gradle
+++ b/java/gradle/dependencies.gradle
@@ -47,7 +47,7 @@ versions += [
log4j : "2.11.2",
mockito : "3.1.0",
murmur : "1.0.0",
- netty : "3.10.6.Final",
+ netty : "4.1.44.Final",
osdetector : "1.6.2",
parquet : "1.10.1",
protobuf : "3.10.0",
@@ -100,7 +100,7 @@ libs += [
log4jSlf4jImpl :
"org.apache.logging.log4j:log4j-slf4j-impl:$versions.log4j",
mockitoCore : "org.mockito:mockito-core:$versions.mockito",
murmur : "com.sangupta:murmur:$versions.murmur",
- netty : "io.netty:netty:$versions.netty",
+ netty : "io.netty:netty-all:$versions.netty",
osdetector :
"com.google.gradle:osdetector-gradle-plugin:$versions.osdetector",
parquetHadoop :
"org.apache.parquet:parquet-hadoop:$versions.parquet",
protobufJava :
"com.google.protobuf:protobuf-java:$versions.protobuf",
diff --git a/java/gradle/shadow.gradle b/java/gradle/shadow.gradle
index bb414e5..c09b06f 100644
--- a/java/gradle/shadow.gradle
+++ b/java/gradle/shadow.gradle
@@ -58,7 +58,7 @@ shadowJar {
relocate "org.checkerframework",
"org.apache.kudu.shaded.org.checkerframework"
relocate "org.hamcrest", "org.apache.kudu.shaded.org.hamcrest"
relocate "org.HdrHistogram", "org.apache.kudu.shaded.org.HdrHistogram"
- relocate "org.jboss.netty", "org.apache.kudu.shaded.org.jboss.netty"
+ relocate "io.netty", "org.apache.kudu.shaded.io.netty"
relocate "scopt", "org.apache.kudu.shaded.scopt"
}
diff --git a/java/gradle/tests.gradle b/java/gradle/tests.gradle
index 94a0d49..eaf083b 100644
--- a/java/gradle/tests.gradle
+++ b/java/gradle/tests.gradle
@@ -56,6 +56,11 @@ tasks.withType(Test) {
jvmArgs += "--add-opens=$module=ALL-UNNAMED"
}
}
+ // Enable paranoid Netty leak detection during tests.
+ // https://netty.io/wiki/reference-counted-objects.html#leak-detection-levels
+ if (propertyExists("nettyLeakDetection")) {
+ jvmArgs += "-Dio.netty.leakDetection.level=paranoid"
+ }
// Set a few system properties.
systemProperty "java.awt.headless", true
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
index 0ebb6ba..c323c95 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableRequest.java
@@ -27,8 +27,8 @@ import java.util.List;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Message;
+import io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
import org.apache.kudu.util.Pair;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index e41e6c9..4b40a39 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -56,15 +57,18 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioWorkerPool;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.Timer;
-import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -282,7 +286,7 @@ public class AsyncKuduClient implements AutoCloseable {
*/
static int FETCH_TABLETS_PER_RANGE_LOOKUP = 1000;
- private final ClientSocketChannelFactory channelFactory;
+ private final Bootstrap bootstrap;
/**
* This map contains data cached from calls to the master's
@@ -365,7 +369,7 @@ public class AsyncKuduClient implements AutoCloseable {
private volatile boolean closed;
private AsyncKuduClient(AsyncKuduClientBuilder b) {
- this.channelFactory = b.createChannelFactory();
+ this.bootstrap = b.createBootstrap();
this.masterAddresses = b.masterAddresses;
this.masterTable = new KuduTable(this, MASTER_TABLE_NAME_PLACEHOLDER,
MASTER_TABLE_NAME_PLACEHOLDER, null, null, 1, null);
@@ -377,8 +381,7 @@ public class AsyncKuduClient implements AutoCloseable {
this.requestTracker = new
RequestTracker(UUID.randomUUID().toString().replace("-", ""));
this.securityContext = new SecurityContext();
- this.connectionCache = new ConnectionCache(
- securityContext, timer, channelFactory);
+ this.connectionCache = new ConnectionCache(securityContext, bootstrap);
this.tokenReacquirer = new AuthnTokenReacquirer(this);
this.authzTokenCache = new AuthzTokenCache(this);
}
@@ -2495,30 +2498,17 @@ public class AsyncKuduClient implements AutoCloseable {
public Deferred<ArrayList<Void>> shutdown() {
checkIsClosed();
closed = true;
- // This is part of step 3. We need to execute this in its own thread
- // because Netty gets stuck in an infinite loop if you try to shut it
- // down from within a thread of its own thread pool. They don't want
- // to fix this so as a workaround we always shut Netty's thread pool
- // down from another thread.
- final class ShutdownThread extends Thread {
- ShutdownThread() {
- super("AsyncKuduClient@" + AsyncKuduClient.super.hashCode() + "
shutdown");
- }
-
- @Override
- public void run() {
- // This terminates the Executor.
- channelFactory.releaseExternalResources();
- }
- }
// 3. Release all other resources.
final class ReleaseResourcesCB implements Callback<ArrayList<Void>,
ArrayList<Void>> {
@Override
- public ArrayList<Void> call(final ArrayList<Void> arg) {
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public ArrayList<Void> call(final ArrayList<Void> arg) throws
InterruptedException {
LOG.debug("Releasing all remaining resources");
timer.stop();
- new ShutdownThread().start();
+ // AbstractEventExecutor sets a default `quietPeriod` of 2 seconds and
a 15 second timeout.
+ // We disable to quiet period to prevent resource leaks due to clients
running forever.
+ bootstrap.config().group().shutdownGracefully(0, 15, TimeUnit.SECONDS);
return arg;
}
@@ -2620,7 +2610,6 @@ public class AsyncKuduClient implements AutoCloseable {
@InterfaceStability.Evolving
public static final class AsyncKuduClientBuilder {
private static final int DEFAULT_MASTER_PORT = 7051;
- private static final int DEFAULT_BOSS_COUNT = 1;
private static final int DEFAULT_WORKER_COUNT = 2 *
Runtime.getRuntime().availableProcessors();
private final List<HostAndPort> masterAddresses;
@@ -2629,9 +2618,7 @@ public class AsyncKuduClient implements AutoCloseable {
private final HashedWheelTimer timer =
new HashedWheelTimer(new
ThreadFactoryBuilder().setDaemon(true).build(), 20, MILLISECONDS);
- private Executor bossExecutor;
private Executor workerExecutor;
- private int bossCount = DEFAULT_BOSS_COUNT;
private int workerCount = DEFAULT_WORKER_COUNT;
private boolean statisticsDisabled = false;
@@ -2707,33 +2694,43 @@ public class AsyncKuduClient implements AutoCloseable {
}
/**
- * Set the executors which will be used for the embedded Netty boss and
workers.
+ * @deprecated the bossExecutor is no longer used and will have no effect
if provided
+ */
+ @Deprecated
+ public AsyncKuduClientBuilder nioExecutors(Executor bossExecutor, Executor
workerExecutor) {
+ this.workerExecutor = workerExecutor;
+ return this;
+ }
+
+ /**
+ * Set the executor which will be used for the embedded Netty workers.
+ *
* Optional.
- * If not provided, uses a simple cached threadpool. If either argument is
null,
- * then such a thread pool will be used in place of that argument.
+ * If not provided, uses a simple cached threadpool. If workerExecutor is
null,
+ * then such a thread pool will be used.
* Note: executor's max thread number must be greater or equal to
corresponding
* worker count, or netty cannot start enough threads, and client will get
stuck.
* If not sure, please just use CachedThreadPool.
*/
- public AsyncKuduClientBuilder nioExecutors(Executor bossExecutor, Executor
workerExecutor) {
- this.bossExecutor = bossExecutor;
+ public AsyncKuduClientBuilder nioExecutor(Executor workerExecutor) {
this.workerExecutor = workerExecutor;
return this;
}
/**
- * Set the maximum number of boss threads.
- * Optional.
- * If not provided, 1 is used.
+ * @deprecated the bossExecutor is no longer used and will have no effect
if provided
*/
+ @Deprecated
public AsyncKuduClientBuilder bossCount(int bossCount) {
- Preconditions.checkArgument(bossCount > 0, "bossCount should be greater
than 0");
- this.bossCount = bossCount;
+ LOG.info("bossCount is deprecated");
return this;
}
/**
- * Set the maximum number of worker threads.
+ * Set the maximum number of Netty worker threads.
+ * A worker thread performs non-blocking read and write for one or more
+ * Netty Channels in a non-blocking mode.
+ *
* Optional.
* If not provided, (2 * the number of available processors) is used. If
* this client instance will be used on a machine running many client
@@ -2748,31 +2745,30 @@ public class AsyncKuduClient implements AutoCloseable {
}
/**
- * Creates the channel factory for Netty. The user can specify the
executors, but
+ * Creates the client bootstrap for Netty. The user can specify the
executor, but
* if they don't, we'll use a simple thread pool.
*/
- private NioClientSocketChannelFactory createChannelFactory() {
- Executor boss = bossExecutor;
+ private Bootstrap createBootstrap() {
Executor worker = workerExecutor;
- if (boss == null || worker == null) {
- Executor defaultExec = Executors.newCachedThreadPool(
+ if (worker == null) {
+ worker = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("kudu-nio-%d")
.setDaemon(true)
.build());
- if (boss == null) {
- boss = defaultExec;
- }
- if (worker == null) {
- worker = defaultExec;
- }
}
- // Share the timer with the socket channel factory so that it does not
- // create an internal timer with a non-daemon thread.
- return new NioClientSocketChannelFactory(boss,
- bossCount,
- new NioWorkerPool(worker,
workerCount),
- timer);
+ EventLoopGroup workerGroup = new NioEventLoopGroup(workerCount, worker);
+ Bootstrap b = new Bootstrap();
+ b.group(workerGroup);
+ b.channel(NioSocketChannel.class);
+ b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000);
+ b.option(ChannelOption.TCP_NODELAY, true);
+ // Unfortunately there is no way to override the keep-alive timeout in
+ // Java since the JRE doesn't expose any way to call setsockopt() with
+ // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh.
+ b.option(ChannelOption.SO_KEEPALIVE, true);
+ b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ return b;
}
/**
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index b9da766..5456c37 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -37,10 +37,10 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
index 9e2c086..124a8ab 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Batch.java
@@ -27,8 +27,8 @@ import com.google.common.base.MoreObjects;
import com.google.common.collect.Iterables;
import com.google.protobuf.Message;
import com.google.protobuf.UnsafeByteOperations;
+import io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
import org.apache.kudu.WireProtocol.AppStatusPB.ErrorCode;
import org.apache.kudu.client.Statistics.Statistic;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Bytes.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/Bytes.java
index 8574abf..d67a440 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Bytes.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Bytes.java
@@ -30,22 +30,17 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Comparator;
import com.google.common.io.BaseEncoding;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.CharsetUtil;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.util.CharsetUtil;
import org.apache.kudu.util.DecimalUtil;
import org.apache.kudu.util.Slice;
@@ -404,7 +399,7 @@ public final class Bytes {
* @param buf The buffer to read from.
* @return The integer read.
*/
- static int readVarInt32(final ChannelBuffer buf) {
+ static int readVarInt32(final ByteBuf buf) {
int result = buf.readByte();
if (result >= 0) {
return result;
@@ -984,26 +979,12 @@ public final class Bytes {
* @param buf The (possibly {@code null}) buffer to pretty-print.
* @return The buffer in a pretty-printed string.
*/
- public static String pretty(final ChannelBuffer buf) {
+ public static String pretty(final ByteBuf buf) {
if (buf == null) {
return "null";
}
- byte[] array;
- try {
- if (buf.getClass() != ReplayingDecoderBuffer) {
- array = buf.array();
- } else if (RDB_buf != null) { // Netty 3.5.1 and above.
- array = ((ChannelBuffer) RDB_buf.invoke(buf)).array();
- } else { // Netty 3.5.0 and before.
- final ChannelBuffer wrapped_buf = (ChannelBuffer) RDB_buffer.get(buf);
- array = wrapped_buf.array();
- }
- } catch (UnsupportedOperationException e) {
- return "(failed to extract content of buffer of type " +
- buf.getClass().getName() + ')';
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new AssertionError("Should not happen: " + e);
- }
+ byte[] array = new byte[buf.readableBytes()];
+ buf.getBytes(buf.readerIndex(), array);
return pretty(array);
}
@@ -1020,68 +1001,6 @@ public final class Bytes {
return sb.toString();
}
- // Ugly stuff
- // ----------
- // Background: when using ReplayingDecoder (which makes it easy to deal with
- // unframed RPC responses), the ChannelBuffer we manipulate is in fact a
- // ReplayingDecoderBuffer, a package-private class that Netty uses. This
- // class, for some reason, throws UnsupportedOperationException on its
- // array() method. This method is unfortunately the only way to easily dump
- // the contents of a ChannelBuffer, which is useful for debugging or logging
- // unexpected buffers. An issue (NETTY-346) has been filed to get access to
- // the buffer, but the resolution was useless: instead of making the array()
- // method work, a new internalBuffer() method was added on ReplayingDecoder,
- // which would require that we keep a reference on the ReplayingDecoder all
- // along in order to properly convert the buffer to a string.
- // So we instead use ugly reflection to gain access to the underlying buffer
- // while taking into account that the implementation of Netty has changed
- // over time, so depending which version of Netty we're working with, we do
- // a different hack. Yes this is horrible, but it's for the greater good as
- // this is what allows us to debug unexpected buffers when deserializing RPCs
- // and what's more important than being able to debug unexpected stuff?
- private static final Class<?> ReplayingDecoderBuffer;
- private static final Field RDB_buffer; // For Netty 3.5.0 and before.
- private static final Method RDB_buf; // For Netty 3.5.1 and above.
-
- static {
- try {
- ReplayingDecoderBuffer = Class.forName("org.jboss.netty.handler.codec." +
- "replay.ReplayingDecoderBuffer");
- Field field = AccessController.doPrivileged(new
PrivilegedAction<Field>() {
- @Override
- public Field run() {
- try {
- Field bufferField =
ReplayingDecoderBuffer.getDeclaredField("buffer");
- bufferField.setAccessible(true);
- return bufferField;
- } catch (NoSuchFieldException e) {
- // Ignore. Field has been removed in Netty 3.5.1.
- return null;
- }
- }
- });
- RDB_buffer = field;
- if (field != null) { // Netty 3.5.0 or before.
- RDB_buf = null;
- } else {
- RDB_buf = AccessController.doPrivileged(new PrivilegedAction<Method>()
{
- @Override
- public Method run() {
- try {
- Method bufMethod =
ReplayingDecoderBuffer.getDeclaredMethod("buf");
- bufMethod.setAccessible(true);
- return bufMethod;
- } catch (NoSuchMethodException e) {
- throw new RuntimeException(e);
- }
- }
- });
- }
- } catch (ReflectiveOperationException | RuntimeException e) {
- throw new RuntimeException("static initializer failed", e);
- }
- }
-
// ---------------------- //
// Comparing byte arrays. //
// ---------------------- //
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
index a042622..41fa35d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/CallResponse.java
@@ -19,11 +19,11 @@ package org.apache.kudu.client;
import java.util.List;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DefaultByteBufHolder;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
import org.apache.kudu.rpc.RpcHeader;
import org.apache.kudu.util.Slice;
@@ -33,8 +33,8 @@ import org.apache.kudu.util.Slice;
* access to sidecars and decoded protobufs from the message.
*/
@InterfaceAudience.Private
-final class CallResponse {
- private final ChannelBuffer buf;
+final class CallResponse extends DefaultByteBufHolder {
+ private final ByteBuf buf;
private final RpcHeader.ResponseHeader header;
private final int totalResponseSize;
@@ -47,11 +47,12 @@ final class CallResponse {
* read from yet, and will only be accessed by this class.
*
* Afterwards, this constructs the RpcHeader from the buffer.
- * @param buf Channel buffer which call response reads from.
+ * @param buf Byte buffer which call response reads from.
* @throws IndexOutOfBoundsException if any length prefix inside the
* response points outside the bounds of the buffer.
*/
- CallResponse(final ChannelBuffer buf) {
+ CallResponse(final ByteBuf buf) {
+ super(buf);
this.buf = buf;
this.totalResponseSize = buf.readableBytes();
@@ -142,7 +143,7 @@ final class CallResponse {
// After checking the length, generates a slice for the next 'length'
// bytes of 'buf'. Advances the buffer's read index by 'length' bytes.
- private static Slice nextBytes(final ChannelBuffer buf, final int length) {
+ private static Slice nextBytes(final ByteBuf buf, final int length) {
byte[] payload;
int offset;
if (buf.hasArray()) { // Zero copy.
@@ -158,17 +159,22 @@ final class CallResponse {
}
/**
- * Netty channel handler which receives incoming frames (ChannelBuffers)
+ * Netty decoder which receives incoming frames (ByteBuf)
* and constructs CallResponse objects.
*/
- static class Decoder extends OneToOneDecoder {
+ static class Decoder extends ByteToMessageDecoder {
+
+ Decoder() {
+ // Only one message is decoded on each read.
+ setSingleDecode(true);
+ }
+
@Override
- protected Object decode(ChannelHandlerContext ctx, Channel channel, Object
message)
- throws Exception {
- if (!(message instanceof ChannelBuffer)) {
- return message;
- }
- return new CallResponse((ChannelBuffer)message);
+ protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object>
out) {
+ // Increase the reference count because CallResponse holds onto and uses
the ByteBuf.
+ // https://netty.io/wiki/reference-counted-objects.html
+ msg.retain();
+ out.add(new CallResponse(msg));
}
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
index 217b768..429e34e 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
@@ -31,8 +31,8 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
+import io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
index 2fc41ef..8a0c713 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
@@ -24,8 +24,8 @@ import java.util.Collection;
import java.util.Collections;
import com.google.protobuf.Message;
+import io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
import org.apache.kudu.master.Master.ConnectToMasterResponsePB;
import org.apache.kudu.master.Master.MasterFeatures;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
index 69767d1..e712475 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -36,27 +36,24 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.DefaultChannelPipeline;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.SocketChannel;
-import org.jboss.netty.channel.socket.SocketChannelConfig;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.timeout.ReadTimeoutException;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,7 +76,7 @@ import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-class Connection extends SimpleChannelUpstreamHandler {
+class Connection extends SimpleChannelInboundHandler<Object> {
/**
* Authentication credentials policy for negotiating outbound connections.
Some requests
* (e.g. {@link ConnectToMasterRequest}) behave differently depending on the
type of credentials
@@ -105,14 +102,14 @@ class Connection extends SimpleChannelUpstreamHandler {
/** Security context to use for connection negotiation. */
private final SecurityContext securityContext;
- /** Timer to monitor read timeouts for the connection (used by Netty's
ReadTimeoutHandler). */
- private final HashedWheelTimer timer;
-
/** Credentials policy to use when authenticating. */
private final CredentialsPolicy credentialsPolicy;
+ /** The Netty client bootstrap used to configure and initialize a connected
channel. */
+ private final Bootstrap bootstrap;
+
/** The underlying Netty's socket channel. */
- private final SocketChannel channel;
+ private SocketChannel channel;
/**
* Set to true when disconnect initiated explicitly from the client side.
The channelDisconnected
@@ -167,9 +164,9 @@ class Connection extends SimpleChannelUpstreamHandler {
@GuardedBy("lock")
private int nextCallId = 0;
+ /** The future for the connection attempt. Set only once connect() is
called. */
@Nullable
@GuardedBy("lock")
- /** The future for the connection attempt. Set only once connect() is
called. */
private ChannelFuture connectFuture;
/**
@@ -177,8 +174,7 @@ class Connection extends SimpleChannelUpstreamHandler {
*
* @param serverInfo the destination server
* @param securityContext security context to use for connection negotiation
- * @param timer timer to set up read timeout on the corresponding Netty
channel
- * @param channelFactory Netty factory to create corresponding Netty channel
+ * @param bootstrap Netty bootstrap to create corresponding Netty channel
* @param credentialsPolicy policy controlling which credentials to use
while negotiating on the
* connection to the target server:
* if {@link
CredentialsPolicy#PRIMARY_CREDENTIALS}, the authentication
@@ -186,32 +182,20 @@ class Connection extends SimpleChannelUpstreamHandler {
*/
Connection(ServerInfo serverInfo,
SecurityContext securityContext,
- HashedWheelTimer timer,
- ClientSocketChannelFactory channelFactory,
+ Bootstrap bootstrap,
CredentialsPolicy credentialsPolicy) {
this.serverInfo = serverInfo;
this.securityContext = securityContext;
this.state = State.NEW;
- this.timer = timer;
this.credentialsPolicy = credentialsPolicy;
-
- final ConnectionPipeline pipeline = new ConnectionPipeline();
- pipeline.init();
-
- channel = channelFactory.newChannel(pipeline);
- SocketChannelConfig config = channel.getConfig();
- config.setConnectTimeoutMillis(60000);
- config.setTcpNoDelay(true);
- // Unfortunately there is no way to override the keep-alive timeout in
- // Java since the JRE doesn't expose any way to call setsockopt() with
- // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh.
- config.setKeepAlive(true);
+ this.bootstrap = bootstrap.clone();
+ this.bootstrap.handler(new ConnectionChannelInitializer());
}
/** {@inheritDoc} */
@Override
- public void channelConnected(final ChannelHandlerContext ctx,
- final ChannelStateEvent e) {
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public void channelActive(final ChannelHandlerContext ctx) {
lock.lock();
try {
if (state == State.TERMINATED) {
@@ -222,56 +206,38 @@ class Connection extends SimpleChannelUpstreamHandler {
} finally {
lock.unlock();
}
- Channels.write(channel, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER));
+ ctx.writeAndFlush(Unpooled.wrappedBuffer(CONNECTION_HEADER),
ctx.voidPromise());
Negotiator negotiator = new
Negotiator(serverInfo.getAndCanonicalizeHostname(), securityContext,
(credentialsPolicy == CredentialsPolicy.PRIMARY_CREDENTIALS));
- ctx.getPipeline().addBefore(ctx.getName(), "negotiation", negotiator);
- negotiator.sendHello(channel);
- }
-
- /** {@inheritDoc} */
- @Override
- public void handleUpstream(final ChannelHandlerContext ctx,
- final ChannelEvent e) throws Exception {
- if (LOG.isTraceEnabled()) {
- LOG.trace("{} upstream event {}", getLogPrefix(), e);
- }
- super.handleUpstream(ctx, e);
- }
-
- /** {@inheritDoc} */
- @Override
- public void channelDisconnected(final ChannelHandlerContext ctx, final
ChannelStateEvent e) {
- // No need to call super.channelDisconnected(ctx, e) -- there should be
nobody in the upstream
- // pipeline after Connection itself. So, just handle the disconnection
event ourselves.
- cleanup(new RecoverableException(Status.NetworkError("connection
disconnected")));
+ ctx.pipeline().addBefore(ctx.name(), "negotiation", negotiator);
+ negotiator.sendHello(ctx);
}
/** {@inheritDoc} */
@Override
- public void channelClosed(final ChannelHandlerContext ctx, final
ChannelStateEvent e) {
+ public void channelInactive(final ChannelHandlerContext ctx) {
+ LOG.debug("{} handling channelInactive", getLogPrefix());
String msg = "connection closed";
// Connection failures are reported as channelClosed() before
exceptionCaught() is called.
// We can detect this case by looking at whether connectFuture has been
marked complete
// and grabbing the exception from there.
lock.lock();
try {
- if (connectFuture != null && connectFuture.getCause() != null) {
- msg = connectFuture.getCause().toString();
+ if (connectFuture != null && connectFuture.cause() != null) {
+ msg = connectFuture.cause().toString();
}
} finally {
lock.unlock();
}
- // No need to call super.channelClosed(ctx, e) -- there should be nobody
in the upstream
+ // No need to call super.channelInactive(ctx, e) -- there should be nobody
in the upstream
// pipeline after Connection itself. So, just handle the close event
ourselves.
cleanup(new RecoverableException(Status.NetworkError(msg)));
}
/** {@inheritDoc} */
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
throws Exception {
- Object m = evt.getMessage();
-
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public void channelRead0(ChannelHandlerContext ctx, Object m) throws
Exception {
// Process the results of a successful negotiation.
if (m instanceof Negotiator.Success) {
lock.lock();
@@ -317,7 +283,7 @@ class Connection extends SimpleChannelUpstreamHandler {
queuedMessages = null;
// Drop the negotiation timeout handler from the pipeline.
- ctx.getPipeline().remove(NEGOTIATION_TIMEOUT_HANDLER);
+ ctx.pipeline().remove(NEGOTIATION_TIMEOUT_HANDLER);
// Set the state to READY -- that means the incoming messages should
be no longer put into
// the queuedMessages, but sent to wire right away (see the
enqueueMessage() for details).
@@ -343,15 +309,15 @@ class Connection extends SimpleChannelUpstreamHandler {
} finally {
lock.unlock();
}
- // Calling Channels.close() triggers the cleanup() which will handle the
negotiation
+ // Calling close() triggers the cleanup() which will handle the
negotiation
// failure appropriately.
- Channels.close(evt.getChannel());
+ ctx.close();
return;
}
// Some other event which the connection does not handle.
if (!(m instanceof CallResponse)) {
- ctx.sendUpstream(evt);
+ ctx.fireChannelRead(m);
return;
}
@@ -416,10 +382,8 @@ class Connection extends SimpleChannelUpstreamHandler {
/** {@inheritDoc} */
@Override
- public void exceptionCaught(final ChannelHandlerContext ctx, final
ExceptionEvent event) {
- Throwable e = event.getCause();
- Channel c = event.getChannel();
-
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws
Exception {
KuduException error;
if (e instanceof KuduException) {
error = (KuduException) e;
@@ -462,14 +426,16 @@ class Connection extends SimpleChannelUpstreamHandler {
// have either gotten a ClosedChannelException or an SSLException.
assert !explicitlyDisconnected;
String message = String.format("%s unexpected exception from downstream
on %s",
- getLogPrefix(), c);
+ getLogPrefix(), ctx.channel());
error = new RecoverableException(Status.NetworkError(message), e);
LOG.error(message, e);
}
cleanup(error);
- if (c.isOpen()) {
- Channels.close(c);
+ // `ctx` is null when `exceptionCaught` is called from the `connectFuture`
+ // listener in `connect()`.
+ if (ctx != null) {
+ ctx.close();
}
}
@@ -583,8 +549,20 @@ class Connection extends SimpleChannelUpstreamHandler {
* @return future object to wait on the disconnect completion, if necessary
*/
ChannelFuture disconnect() {
- explicitlyDisconnected = true;
- return Channels.disconnect(channel);
+ lock.lock();
+ try {
+ LOG.debug("{} disconnecting while in state {}", getLogPrefix(), state);
+ explicitlyDisconnected = true;
+ // No connection has been made yet.
+ if (state == State.NEW) {
+ // Use an EmbeddedChannel to return a valid and immediately completed
ChannelFuture.
+ return new EmbeddedChannel().disconnect();
+ } else {
+ return connectFuture.channel().disconnect();
+ }
+ } finally {
+ lock.unlock();
+ }
}
/**
@@ -617,7 +595,7 @@ class Connection extends SimpleChannelUpstreamHandler {
deferred.callback(null);
return;
}
- final Throwable t = future.getCause();
+ final Throwable t = future.cause();
if (t instanceof Exception) {
deferred.callback(t);
} else {
@@ -671,12 +649,13 @@ class Connection extends SimpleChannelUpstreamHandler {
* Start sending the message to the server over the wire. It's crucial to
not hold the lock
* while doing so: see enqueueMessage() and KUDU-1894 for details.
*/
+ @SuppressWarnings("FutureReturnValueIgnored")
private void sendCallToWire(final RpcOutboundMessage msg) {
assert !lock.isHeldByCurrentThread();
if (LOG.isTraceEnabled()) {
LOG.trace("{} sending {}", getLogPrefix(), msg);
}
- Channels.write(channel, msg);
+ channel.writeAndFlush(msg, channel.voidPromise());
}
/**
@@ -745,10 +724,24 @@ class Connection extends SimpleChannelUpstreamHandler {
/** Initiate opening TCP connection to the server. */
@GuardedBy("lock")
private void connect() {
+ LOG.debug("{} connecting to peer", getLogPrefix());
Preconditions.checkState(lock.isHeldByCurrentThread());
Preconditions.checkState(state == State.NEW);
state = State.CONNECTING;
- connectFuture = channel.connect(serverInfo.getResolvedAddress());
+ connectFuture = bootstrap.connect(serverInfo.getResolvedAddress());
+ connectFuture.addListener(new GenericFutureListener<Future<? super
Void>>() {
+ @Override
+ public void operationComplete(Future<? super Void> future) throws
Exception {
+ if (future.isSuccess()) {
+ LOG.debug("{} Successfully connected to peer", getLogPrefix());
+ return;
+ }
+ // If the connection failed, pass the exception to exceptionCaught to
be handled.
+ final Throwable t = future.cause();
+ exceptionCaught(null, t);
+ }
+ });
+ channel = (SocketChannel) connectFuture.channel();
}
/** Enumeration to represent the internal state of the Connection object. */
@@ -811,22 +804,23 @@ class Connection extends SimpleChannelUpstreamHandler {
}
}
- /** The helper class to build the Netty's connection pipeline. */
- private final class ConnectionPipeline extends DefaultChannelPipeline {
- void init() {
- super.addFirst("decode-frames", new LengthFieldBasedFrameDecoder(
- KuduRpc.MAX_RPC_SIZE,
- 0, // length comes at offset 0
- 4, // length prefix is 4 bytes long
- 0, // no "length adjustment"
- 4 /* strip the length prefix */));
- super.addLast("decode-inbound", new CallResponse.Decoder());
- super.addLast("encode-outbound", new RpcOutboundMessage.Encoder());
+ private final class ConnectionChannelInitializer extends
ChannelInitializer<Channel> {
+ @Override
+ public void initChannel(Channel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addFirst("decode-frames", new LengthFieldBasedFrameDecoder(
+ KuduRpc.MAX_RPC_SIZE,
+ 0, // length comes at offset 0
+ 4, // length prefix is 4 bytes long
+ 0, // no "length adjustment"
+ 4 /* strip the length prefix */));
+ pipeline.addLast("decode-inbound", new CallResponse.Decoder());
+ pipeline.addLast("encode-outbound", new RpcOutboundMessage.Encoder());
// Add a socket read timeout handler to function as a timeout for
negotiation.
// The handler will be removed once the connection is negotiated.
- super.addLast(NEGOTIATION_TIMEOUT_HANDLER, new ReadTimeoutHandler(
- Connection.this.timer, NEGOTIATION_TIMEOUT_MS,
TimeUnit.MILLISECONDS));
- super.addLast("kudu-handler", Connection.this);
+ pipeline.addLast(NEGOTIATION_TIMEOUT_HANDLER,
+ new ReadTimeoutHandler(NEGOTIATION_TIMEOUT_MS,
TimeUnit.MILLISECONDS));
+ pipeline.addLast("kudu-handler", Connection.this);
}
}
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index 6da15b5..83bff27 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -27,10 +27,9 @@ import javax.annotation.concurrent.GuardedBy;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.stumbleupon.async.Deferred;
+import io.netty.bootstrap.Bootstrap;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
/**
* The ConnectionCache is responsible for managing connections to Kudu masters
and tablet servers.
@@ -54,11 +53,8 @@ class ConnectionCache {
/** Security context to use for connection negotiation. */
private final SecurityContext securityContext;
- /** Timer to monitor read timeouts for connections (used by Netty's
ReadTimeoutHandler) */
- private final HashedWheelTimer timer;
-
- /** Netty's channel factory to use by connections. */
- private final ClientSocketChannelFactory channelFactory;
+ /** Netty's bootstrap to use by connections. */
+ private final Bootstrap bootstrap;
/**
* Container mapping server IP/port into the established connection from the
client to the
@@ -71,11 +67,9 @@ class ConnectionCache {
/** Create a new empty ConnectionCache given the specified parameters. */
ConnectionCache(SecurityContext securityContext,
- HashedWheelTimer timer,
- ClientSocketChannelFactory channelFactory) {
+ Bootstrap bootstrap) {
this.securityContext = securityContext;
- this.timer = timer;
- this.channelFactory = channelFactory;
+ this.bootstrap = bootstrap;
}
/**
@@ -127,8 +121,7 @@ class ConnectionCache {
if (result == null) {
result = new Connection(serverInfo,
securityContext,
- timer,
- channelFactory,
+ bootstrap,
credentialsPolicy);
connections.add(result);
// There can be at most 2 connections to the same destination: one
with primary and another
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
index f5c5eb5..0885fc2 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
@@ -22,8 +22,8 @@ import java.util.EnumSet;
import java.util.List;
import com.google.protobuf.Message;
+import io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
import org.apache.kudu.Schema;
import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
index dd8f1fc..127ec74 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/DeleteTableRequest.java
@@ -18,8 +18,8 @@
package org.apache.kudu.client;
import com.google.protobuf.Message;
+import io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
import org.apache.kudu.master.Master;
import org.apache.kudu.util.Pair;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
index 3be2770..7542e10 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableLocationsRequest.java
@@ -20,8 +20,8 @@ package org.apache.kudu.client;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.UnsafeByteOperations;
+import io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
import org.apache.kudu.master.Master;
import org.apache.kudu.util.Pair;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
index acd5744..4aac027 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
@@ -28,8 +28,8 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
+import io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
import org.apache.kudu.Schema;
import org.apache.kudu.master.Master;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsRequest.java
index e541f39..6a0baf4 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsRequest.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableStatisticsRequest.java
@@ -18,8 +18,8 @@
package org.apache.kudu.client;
import com.google.protobuf.Message;
+import io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
import org.apache.kudu.master.Master;
import org.apache.kudu.util.Pair;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
index 2bce8a1..3a8a321 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/IsAlterTableDoneRequest.java
@@ -22,8 +22,8 @@ import static
org.apache.kudu.master.Master.IsAlterTableDoneResponsePB;
import static org.apache.kudu.master.Master.TableIdentifierPB;
import com.google.protobuf.Message;
+import io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
import org.apache.kudu.util.Pair;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
index 76ca70a..2eb0d7c 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/IsCreateTableDoneRequest.java
@@ -18,8 +18,8 @@
package org.apache.kudu.client;
import com.google.protobuf.Message;
+import io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
import org.apache.kudu.master.Master.IsCreateTableDoneRequestPB;
import org.apache.kudu.master.Master.IsCreateTableDoneResponsePB;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index fc47700..7c84373 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -513,33 +513,49 @@ public class KuduClient implements AutoCloseable {
}
/**
- * Set the executors which will be used for the embedded Netty boss and
workers.
+ * @deprecated the bossExecutor is no longer used and will have no effect
if provided
+ */
+ @Deprecated
+ public KuduClientBuilder nioExecutors(Executor bossExecutor, Executor
workerExecutor) {
+ clientBuilder.nioExecutors(bossExecutor, workerExecutor);
+ return this;
+ }
+
+ /**
+ * Set the executor which will be used for the embedded Netty workers.
+ *
* Optional.
- * If not provided, uses a simple cached threadpool. If either argument is
null,
- * then such a thread pool will be used in place of that argument.
+ * If not provided, uses a simple cached threadpool. If workerExecutor is
null,
+ * then such a thread pool will be used.
* Note: executor's max thread number must be greater or equal to
corresponding
* worker count, or netty cannot start enough threads, and client will get
stuck.
* If not sure, please just use CachedThreadPool.
*/
- public KuduClientBuilder nioExecutors(Executor bossExecutor, Executor
workerExecutor) {
- clientBuilder.nioExecutors(bossExecutor, workerExecutor);
+ public KuduClientBuilder nioExecutor(Executor workerExecutor) {
+ clientBuilder.nioExecutor(workerExecutor);
return this;
}
/**
- * Set the maximum number of boss threads.
- * Optional.
- * If not provided, 1 is used.
+ * @deprecated the bossExecutor is no longer used and will have no effect
if provided
*/
+ @Deprecated
public KuduClientBuilder bossCount(int bossCount) {
- clientBuilder.bossCount(bossCount);
+ LOG.info("bossCount is deprecated");
return this;
}
/**
* Set the maximum number of worker threads.
+ * A worker thread performs non-blocking read and write for one or more
+ * Netty Channels in a non-blocking mode.
+ *
* Optional.
- * If not provided, (2 * the number of available processors) is used.
+ * If not provided, (2 * the number of available processors) is used. If
+ * this client instance will be used on a machine running many client
+ * instances, it may be wise to lower this count, for example to avoid
+ * resource limits, at the possible cost of some performance of this client
+ * instance.
*/
public KuduClientBuilder workerCount(int workerCount) {
clientBuilder.workerCount(workerCount);
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index 56a626a..a224c65 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -39,12 +39,12 @@ import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.stumbleupon.async.Deferred;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.Timer;
-import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -426,26 +426,22 @@ public abstract class KuduRpc<R> {
}
// TODO(todd): make this private and have all RPCs send RpcOutboundMessage
- // instances instead of ChannelBuffers
- static ChannelBuffer toChannelBuffer(Message header, Message pb) {
+ // instances instead of ByteBuf
+ static void toByteBuf(ByteBuf out, Message header, Message pb) {
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, pb);
- byte[] buf = new byte[totalSize + 4];
- ChannelBuffer chanBuf = ChannelBuffers.wrappedBuffer(buf);
- chanBuf.clear();
- chanBuf.writeInt(totalSize);
- final CodedOutputStream out = CodedOutputStream.newInstance(buf, 4,
totalSize);
- try {
- out.writeUInt32NoTag(header.getSerializedSize());
- header.writeTo(out);
-
- out.writeUInt32NoTag(pb.getSerializedSize());
- pb.writeTo(out);
- out.checkNoSpaceLeft();
+ out.capacity(totalSize + 4);
+ out.writeInt(totalSize);
+ try (ByteBufOutputStream bos = new ByteBufOutputStream(out)) {
+ CodedOutputStream cos = CodedOutputStream.newInstance(bos, totalSize);
+ cos.writeUInt32NoTag(header.getSerializedSize());
+ header.writeTo(cos);
+
+ cos.writeUInt32NoTag(pb.getSerializedSize());
+ pb.writeTo(cos);
+ cos.flush();
} catch (IOException e) {
throw new RuntimeException("Cannot serialize the following message " +
pb);
}
- chanBuf.writerIndex(buf.length);
- return chanBuf;
}
/**
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
index 9c7161f..a462ba1 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTablesRequest.java
@@ -21,8 +21,8 @@ import java.util.ArrayList;
import java.util.List;
import com.google.protobuf.Message;
+import io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
import org.apache.kudu.client.ListTablesResponse.TableInfo;
import org.apache.kudu.master.Master;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
index 92b1ebb..9451c79 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletServersRequest.java
@@ -24,8 +24,8 @@ import java.util.ArrayList;
import java.util.List;
import com.google.protobuf.Message;
+import io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
import org.apache.kudu.util.Pair;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
index 277837a..7010bf3 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/ListTabletsRequest.java
@@ -21,8 +21,8 @@ import java.util.ArrayList;
import java.util.List;
import com.google.protobuf.Message;
+import io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
import org.apache.kudu.tserver.Tserver;
import org.apache.kudu.util.Pair;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index 84a982d..75b1489 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -27,10 +27,13 @@
package org.apache.kudu.client;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.security.cert.Certificate;
@@ -60,18 +63,17 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerAdapter;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
import org.apache.yetus.audience.InterfaceAudience;
import org.ietf.jgss.GSSException;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
-import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,7 +91,7 @@ import org.apache.kudu.util.SecurityUtil;
* from the pipeline and fires a Negotiator.Success or Negotiator.Failure
upstream.
*/
@InterfaceAudience.Private
-public class Negotiator extends SimpleChannelUpstreamHandler {
+public class Negotiator extends SimpleChannelInboundHandler<CallResponse> {
private static final Logger LOG = LoggerFactory.getLogger(Negotiator.class);
private final SaslClientCallbackHandler saslCallback = new
SaslClientCallbackHandler();
@@ -190,7 +192,7 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
* and add the handler directly to the real ChannelPipeline.
* Only non-null once TLS is initiated.
*/
- private DecoderEmbedder<ChannelBuffer> sslEmbedder;
+ private EmbeddedChannel sslEmbedder;
/**
* The nonce sent from the server to the client, or null if negotiation has
@@ -202,7 +204,7 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
* Future indicating whether the embedded handshake has completed.
* Only non-null once TLS is initiated.
*/
- private ChannelFuture sslHandshakeFuture;
+ private Future<Channel> sslHandshakeFuture;
private Certificate peerCert;
@@ -231,11 +233,11 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
}
}
- public void sendHello(Channel channel) {
- sendNegotiateMessage(channel);
+ public void sendHello(ChannelHandlerContext ctx) {
+ sendNegotiateMessage(ctx);
}
- private void sendNegotiateMessage(Channel channel) {
+ private void sendNegotiateMessage(ChannelHandlerContext ctx) {
RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder()
.setStep(RpcHeader.NegotiatePB.NegotiateStep.NEGOTIATE);
@@ -244,7 +246,7 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
for (RpcHeader.RpcFeatureFlag flag : SUPPORTED_RPC_FEATURES) {
builder.addSupportedFeatures(flag);
}
- if (isLoopbackConnection(channel)) {
+ if (isLoopbackConnection(ctx.channel())) {
builder.addSupportedFeatures(RpcFeatureFlag.TLS_AUTHENTICATION_ONLY);
}
@@ -265,57 +267,48 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
// Java client.
state = State.AWAIT_NEGOTIATE;
- sendSaslMessage(channel, builder.build());
+ sendSaslMessage(ctx, builder.build());
}
- private void sendSaslMessage(Channel channel, RpcHeader.NegotiatePB msg) {
- Preconditions.checkNotNull(channel);
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void sendSaslMessage(ChannelHandlerContext ctx,
RpcHeader.NegotiatePB msg) {
RpcHeader.RequestHeader.Builder builder =
RpcHeader.RequestHeader.newBuilder();
builder.setCallId(SASL_CALL_ID);
- Channels.write(channel, new RpcOutboundMessage(builder, msg));
+ ctx.writeAndFlush(new RpcOutboundMessage(builder, msg), ctx.voidPromise());
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
throws IOException {
- Object m = evt.getMessage();
- if (!(m instanceof CallResponse)) {
- ctx.sendUpstream(evt);
- return;
- }
- handleResponse(ctx.getChannel(), (CallResponse)m);
- }
-
- private void handleResponse(Channel chan, CallResponse callResponse) throws
IOException {
- final RpcHeader.ResponseHeader header = callResponse.getHeader();
+ public void channelRead0(ChannelHandlerContext ctx, CallResponse msg) throws
IOException {
+ final RpcHeader.ResponseHeader header = msg.getHeader();
if (header.getIsError()) {
final RpcHeader.ErrorStatusPB.Builder errBuilder =
RpcHeader.ErrorStatusPB.newBuilder();
- KuduRpc.readProtobuf(callResponse.getPBMessage(), errBuilder);
+ KuduRpc.readProtobuf(msg.getPBMessage(), errBuilder);
final RpcHeader.ErrorStatusPB error = errBuilder.build();
LOG.debug("peer {} sent connection negotiation error: {}",
- chan.getRemoteAddress(), error.getMessage());
+ ctx.channel().remoteAddress(), error.getMessage());
// The upstream code should handle the negotiation failure.
state = State.FINISHED;
- chan.getPipeline().remove(this);
- Channels.fireMessageReceived(chan, new Failure(error));
+ ctx.pipeline().remove(this);
+ ctx.fireChannelRead(new Failure(error));
return;
}
- RpcHeader.NegotiatePB response = parseSaslMsgResponse(callResponse);
+ RpcHeader.NegotiatePB response = parseSaslMsgResponse(msg);
// TODO: check that the message type matches the expected one in all
// of the below implementations.
switch (state) {
case AWAIT_NEGOTIATE:
- handleNegotiateResponse(chan, response);
+ handleNegotiateResponse(ctx, response);
break;
case AWAIT_SASL:
- handleSaslMessage(chan, response);
+ handleSaslMessage(ctx, response);
break;
case AWAIT_TOKEN_EXCHANGE:
- handleTokenExchangeResponse(chan, response);
+ handleTokenExchangeResponse(ctx, response);
break;
case AWAIT_TLS_HANDSHAKE:
- handleTlsMessage(chan, response);
+ handleTlsMessage(ctx, response);
break;
default:
throw new IllegalStateException("received a message in unexpected
state: " +
@@ -323,13 +316,14 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
}
}
- private void handleSaslMessage(Channel chan, NegotiatePB response) throws
IOException {
+ private void handleSaslMessage(ChannelHandlerContext ctx, NegotiatePB
response)
+ throws IOException {
switch (response.getStep()) {
case SASL_CHALLENGE:
- handleChallengeResponse(chan, response);
+ handleChallengeResponse(ctx, response);
break;
case SASL_SUCCESS:
- handleSuccessResponse(chan, response);
+ handleSuccessResponse(ctx, response);
break;
default:
throw new IllegalStateException("Wrong negotiation step: " +
@@ -350,7 +344,7 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
return saslBuilder.build();
}
- private void handleNegotiateResponse(Channel chan,
+ private void handleNegotiateResponse(ChannelHandlerContext ctx,
RpcHeader.NegotiatePB response) throws
IOException {
Preconditions.checkState(response.getStep() == NegotiateStep.NEGOTIATE,
"Expected NEGOTIATE message, got {}", response.getStep());
@@ -370,9 +364,9 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
// If we negotiated TLS, then we want to start the TLS handshake;
otherwise,
// we can move directly to the authentication phase.
if (negotiatedTls) {
- startTlsHandshake(chan);
+ startTlsHandshake(ctx);
} else {
- startAuthentication(chan);
+ startAuthentication(ctx);
}
}
@@ -385,8 +379,8 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
return true;
}
try {
- InetAddress local =
((InetSocketAddress)channel.getLocalAddress()).getAddress();
- InetAddress remote =
((InetSocketAddress)channel.getRemoteAddress()).getAddress();
+ InetAddress local =
((InetSocketAddress)channel.localAddress()).getAddress();
+ InetAddress remote =
((InetSocketAddress)channel.remoteAddress()).getAddress();
return local.equals(remote);
} catch (ClassCastException cce) {
// In the off chance that we have some other type of local/remote
address,
@@ -538,7 +532,7 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
/**
* Send the initial TLS "ClientHello" message.
*/
- private void startTlsHandshake(Channel chan) throws SSLException {
+ private void startTlsHandshake(ChannelHandlerContext ctx) throws
SSLException {
SSLEngine engine;
switch (chosenAuthnType) {
case SASL:
@@ -566,12 +560,12 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
"Supported suites: " + Joiner.on(',').join(supported));
}
engine.setEnabledCipherSuites(toEnable.toArray(new String[0]));
- SslHandler handler = new SslHandler(engine);
- handler.setEnableRenegotiation(false);
- sslEmbedder = new DecoderEmbedder<>(handler);
- sslHandshakeFuture = handler.handshake();
+ SharableSslHandler handler = new SharableSslHandler(engine);
+
+ sslEmbedder = new EmbeddedChannel(handler);
+ sslHandshakeFuture = handler.handshakeFuture();
state = State.AWAIT_TLS_HANDSHAKE;
- boolean sent = sendPendingOutboundTls(chan);
+ boolean sent = sendPendingOutboundTls(ctx);
assert sent;
}
@@ -579,15 +573,17 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
* Handle an inbound message during the TLS handshake. If this message
* causes the handshake to complete, triggers the beginning of SASL
initiation.
*/
- private void handleTlsMessage(Channel chan, NegotiatePB response) throws
IOException {
+ private void handleTlsMessage(ChannelHandlerContext ctx, NegotiatePB
response)
+ throws IOException {
Preconditions.checkState(response.getStep() ==
NegotiateStep.TLS_HANDSHAKE);
Preconditions.checkArgument(!response.getTlsHandshake().isEmpty(),
"empty TLS message from server");
// Pass the TLS message into our embedded SslHandler.
- sslEmbedder.offer(ChannelBuffers.copiedBuffer(
+ sslEmbedder.writeInbound(Unpooled.copiedBuffer(
response.getTlsHandshake().asReadOnlyByteBuffer()));
- if (sendPendingOutboundTls(chan)) {
+ sslEmbedder.flush();
+ if (sendPendingOutboundTls(ctx)) {
// Data was sent -- we must continue the handshake process.
return;
}
@@ -598,8 +594,9 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
//
// NOTE: this takes effect immediately (i.e. the following SASL initiation
// sequence is encrypted).
- SslHandler handler = (SslHandler)sslEmbedder.getPipeline().getFirst();
- Certificate[] certs =
handler.getEngine().getSession().getPeerCertificates();
+ SharableSslHandler handler = (SharableSslHandler)
sslEmbedder.pipeline().first();
+ handler.resetAdded();
+ Certificate[] certs = handler.engine().getSession().getPeerCertificates();
if (certs.length == 0) {
throw new SSLPeerUnverifiedException("no peer cert found");
}
@@ -609,11 +606,11 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
// Don't wrap the TLS socket if we are using TLS for authentication only.
boolean isAuthOnly =
serverFeatures.contains(RpcFeatureFlag.TLS_AUTHENTICATION_ONLY) &&
- isLoopbackConnection(chan);
+ isLoopbackConnection(ctx.channel());
if (!isAuthOnly) {
- chan.getPipeline().addFirst("tls", handler);
+ ctx.pipeline().addFirst("tls", handler);
}
- startAuthentication(chan);
+ startAuthentication(ctx);
}
/**
@@ -622,13 +619,17 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
*
* Otherwise, indicates that the handshake is complete by returning false.
*/
- private boolean sendPendingOutboundTls(Channel chan) {
+ private boolean sendPendingOutboundTls(ChannelHandlerContext ctx) {
// The SslHandler can generate multiple TLS messages in response
// (e.g. ClientKeyExchange, ChangeCipherSpec, ClientFinished).
// We poll the handler until it stops giving us buffers.
List<ByteString> bufs = Lists.newArrayList();
- while (sslEmbedder.peek() != null) {
- bufs.add(ByteString.copyFrom(sslEmbedder.poll().toByteBuffer()));
+ while (!sslEmbedder.outboundMessages().isEmpty()) {
+ ByteBuf msg = sslEmbedder.readOutbound();
+ bufs.add(ByteString.copyFrom(msg.nioBuffer()));
+ // Release the reference counted ByteBuf to avoid leaks now that we are
done with it.
+ // https://netty.io/wiki/reference-counted-objects.html
+ msg.release();
}
ByteString data = ByteString.copyFrom(bufs);
if (sslHandshakeFuture.isDone()) {
@@ -640,7 +641,7 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
return false;
} else {
assert data.size() > 0;
- sendTunneledTls(chan, data);
+ sendTunneledTls(ctx, data);
return true;
}
}
@@ -649,27 +650,28 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
* Send a buffer of data for the TLS handshake, encapsulated in the
* appropriate TLS_HANDSHAKE negotiation message.
*/
- private void sendTunneledTls(Channel chan, ByteString buf) {
- sendSaslMessage(chan, NegotiatePB.newBuilder()
+ private void sendTunneledTls(ChannelHandlerContext ctx, ByteString buf) {
+ sendSaslMessage(ctx, NegotiatePB.newBuilder()
.setStep(NegotiateStep.TLS_HANDSHAKE)
.setTlsHandshake(buf)
.build());
}
- private void startAuthentication(Channel chan) throws SaslException,
NonRecoverableException {
+ private void startAuthentication(ChannelHandlerContext ctx)
+ throws SaslException, NonRecoverableException {
switch (chosenAuthnType) {
case SASL:
- sendSaslInitiate(chan);
+ sendSaslInitiate(ctx);
break;
case TOKEN:
- sendTokenExchange(chan);
+ sendTokenExchange(ctx);
break;
default:
throw new AssertionError("unreachable");
}
}
- private void sendTokenExchange(Channel chan) {
+ private void sendTokenExchange(ChannelHandlerContext ctx) {
// We must not send a token unless we have successfully finished
// authenticating via TLS.
Preconditions.checkNotNull(authnToken);
@@ -680,19 +682,20 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
.setStep(NegotiateStep.TOKEN_EXCHANGE)
.setAuthnToken(authnToken);
state = State.AWAIT_TOKEN_EXCHANGE;
- sendSaslMessage(chan, builder.build());
+ sendSaslMessage(ctx, builder.build());
}
- private void handleTokenExchangeResponse(Channel chan, NegotiatePB response)
+ private void handleTokenExchangeResponse(ChannelHandlerContext ctx,
NegotiatePB response)
throws SaslException {
Preconditions.checkArgument(response.getStep() ==
NegotiateStep.TOKEN_EXCHANGE,
"expected TOKEN_EXCHANGE, got step: {}", response.getStep());
// The token response doesn't have any actual data in it, so we can just
move on.
- finish(chan);
+ finish(ctx);
}
- private void sendSaslInitiate(Channel chan) throws SaslException,
NonRecoverableException {
+ private void sendSaslInitiate(ChannelHandlerContext ctx)
+ throws SaslException, NonRecoverableException {
RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
if (saslClient.hasInitialResponse()) {
byte[] initialResponse = evaluateChallenge(new byte[0]);
@@ -701,10 +704,10 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.SASL_INITIATE);
builder.addSaslMechanismsBuilder().setMechanism(chosenMech.name());
state = State.AWAIT_SASL;
- sendSaslMessage(chan, builder.build());
+ sendSaslMessage(ctx, builder.build());
}
- private void handleChallengeResponse(Channel chan, RpcHeader.NegotiatePB
response)
+ private void handleChallengeResponse(ChannelHandlerContext ctx,
RpcHeader.NegotiatePB response)
throws SaslException, NonRecoverableException {
byte[] saslToken = evaluateChallenge(response.getToken().toByteArray());
if (saslToken == null) {
@@ -713,7 +716,7 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
builder.setToken(UnsafeByteOperations.unsafeWrap(saslToken));
builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.SASL_RESPONSE);
- sendSaslMessage(chan, builder.build());
+ sendSaslMessage(ctx, builder.build());
}
/**
@@ -739,9 +742,10 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
}
}
- private void handleSuccessResponse(Channel chan, NegotiatePB response)
throws IOException {
+ private void handleSuccessResponse(ChannelHandlerContext ctx, NegotiatePB
response)
+ throws IOException {
Preconditions.checkState(saslClient.isComplete(),
- "server sent SASL_SUCCESS step, but SASL
negotiation is not complete");
+ "server sent SASL_SUCCESS step, but SASL negotiation is not
complete");
if (chosenMech == SaslMechanism.GSSAPI) {
if (response.hasNonce()) {
// Grab the nonce from the server, if it has sent one. We'll send it
back
@@ -755,21 +759,22 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
}
}
- finish(chan);
+ finish(ctx);
}
/**
* Marks the negotiation as finished, and sends the connection context to
the server.
- * @param chan the connection channel
+ * @param ctx the connection context
*/
- private void finish(Channel chan) throws SaslException {
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void finish(ChannelHandlerContext ctx) throws SaslException {
state = State.FINISHED;
- chan.getPipeline().remove(this);
+ ctx.pipeline().remove(this);
- Channels.write(chan, makeConnectionContext());
+ ctx.writeAndFlush(makeConnectionContext(), ctx.voidPromise());
LOG.debug("Authenticated connection {} using {}/{}",
- chan, chosenAuthnType, chosenMech);
- Channels.fireMessageReceived(chan, new Success(serverFeatures));
+ ctx.channel(), chosenAuthnType, chosenMech);
+ ctx.fireChannelRead(new Success(serverFeatures));
}
private RpcOutboundMessage makeConnectionContext() throws SaslException {
@@ -871,4 +876,39 @@ public class Negotiator extends
SimpleChannelUpstreamHandler {
this.status = status;
}
}
+
+ /**
+ * A hack to allow sharing the SslHandler even though it's not annotated as
"Sharable".
+ * We aren't technically sharing it, but when we move it from the
EmbeddedChannel to
+ * the actual channel above the sharing validation runs and throws an
exception.
+ *
+ *
https://netty.io/wiki/new-and-noteworthy-in-4.0.html#well-defined-thread-model
+ * https://netty.io/4.0/api/io/netty/channel/ChannelHandler.Sharable.html
+ *
+ * TODO (ghenke): Remove the need for this reflection.
+ */
+ static class SharableSslHandler extends SslHandler {
+
+ public SharableSslHandler(SSLEngine engine) {
+ super(engine);
+ }
+
+ void resetAdded() {
+ Field addedField =
AccessController.doPrivileged((PrivilegedAction<Field>) () -> {
+ try {
+ Class<?> c = ChannelHandlerAdapter.class;
+ Field added = c.getDeclaredField("added");
+ added.setAccessible(true);
+ return added;
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ try {
+ addedField.setBoolean(this, false);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index ec6b768..531c776 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -29,9 +29,9 @@ import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.UnsafeByteOperations;
+import io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.util.Timer;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
index 877a571..cbb8e3c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PingRequest.java
@@ -18,9 +18,9 @@
package org.apache.kudu.client;
import com.google.protobuf.Message;
+import io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.util.Timer;
import org.apache.kudu.master.Master;
import org.apache.kudu.util.Pair;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcOutboundMessage.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcOutboundMessage.java
index ab1ce40..9fcd221 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcOutboundMessage.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcOutboundMessage.java
@@ -19,9 +19,9 @@ package org.apache.kudu.client;
import com.google.protobuf.Message;
import com.google.protobuf.TextFormat;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,21 +61,15 @@ class RpcOutboundMessage {
/**
* Netty encoder implementation to serialize outbound messages.
*/
- static class Encoder extends OneToOneEncoder {
+ static class Encoder extends MessageToByteEncoder<RpcOutboundMessage> {
+
@Override
- protected Object encode(ChannelHandlerContext ctx, Channel chan,
- Object obj) throws Exception {
- if (!(obj instanceof RpcOutboundMessage)) {
- return obj;
- }
- RpcOutboundMessage msg = (RpcOutboundMessage)obj;
+ protected void encode(ChannelHandlerContext ctx, RpcOutboundMessage msg,
ByteBuf out) {
if (LOG.isTraceEnabled()) {
- LOG.trace("{}: sending RPC {}", chan, msg);
+ LOG.trace("{}: sending RPC {}", ctx.channel(), msg);
}
- // TODO(todd): move this impl into this class and remove external
- // callers.
- return KuduRpc.toChannelBuffer(msg.getHeaderBuilder().build(),
- msg.getBody());
+ // TODO(todd): move this impl into this class and remove external
callers.
+ KuduRpc.toByteBuf(out, msg.getHeaderBuilder().build(), msg.getBody());
}
}
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/SplitKeyRangeRequest.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/SplitKeyRangeRequest.java
index 3a82e56..b167736 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/SplitKeyRangeRequest.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/SplitKeyRangeRequest.java
@@ -22,8 +22,8 @@ import java.util.List;
import com.google.protobuf.Message;
import com.google.protobuf.UnsafeByteOperations;
+import io.netty.util.Timer;
import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.util.Timer;
import org.apache.kudu.Common.KeyRangePB;
import org.apache.kudu.security.Token;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
b/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
index dd54a4e..2da927a 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
@@ -21,7 +21,6 @@ import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
-import java.util.ArrayList;
import java.util.List;
import com.google.common.base.Functions;
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
index 5cc72e8..5e5943f 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
@@ -189,6 +189,7 @@ public class ITClient {
* @return true if successfully completed or didn't find a server to
disconnect, false it it
* encountered a failure
*/
+ @SuppressWarnings("FutureReturnValueIgnored")
private boolean disconnectNode() {
try {
final List<Connection> connections =
harness.getAsyncClient().getConnectionListCopy();
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
index 76d9471..c3c2d2d 100644
---
a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
+++
b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
@@ -160,6 +160,7 @@ public class ITScannerMultiTablet {
* @param isFaultTolerant if true use fault-tolerant scanner, otherwise use
non-fault-tolerant one
* @throws Exception
*/
+ @SuppressWarnings("FutureReturnValueIgnored")
void clientFaultInjection(boolean isFaultTolerant) throws KuduException {
KuduScanner scanner = harness.getClient().newScannerBuilder(table)
.setFaultTolerant(isFaultTolerant)
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
index 17d3a15..a7fdb9b 100644
---
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
+++
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
@@ -111,6 +111,7 @@ public class TestAsyncKuduClient {
assertEquals(rowCount - numRows, countRowsInScan(scanner));
}
+ @SuppressWarnings("FutureReturnValueIgnored")
private void disconnectAndWait() throws InterruptedException {
for (Connection c : asyncClient.getConnectionListCopy()) {
c.disconnect();
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthTokenReacquire.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthTokenReacquire.java
index 24aef14..82b2035 100644
---
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthTokenReacquire.java
+++
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthTokenReacquire.java
@@ -84,6 +84,7 @@ public class TestAuthTokenReacquire {
asyncClient = harness.getAsyncClient();
}
+ @SuppressWarnings("FutureReturnValueIgnored")
private void dropConnections() {
for (Connection c : asyncClient.getConnectionListCopy()) {
c.disconnect();
@@ -172,7 +173,7 @@ public class TestAuthTokenReacquire {
}
if (!exceptions.isEmpty()) {
for (Map.Entry<Integer, Throwable> e : exceptions.entrySet()) {
- LOG.error("exception in thread {}: {}", e.getKey(), e.getValue());
+ LOG.error(String.format("exception in thread %s:", e.getKey()),
e.getValue());
}
fail("test failed: unexpected errors");
}
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquireOpen.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquireOpen.java
index dc98a65..f7d23b4 100644
---
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquireOpen.java
+++
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquireOpen.java
@@ -78,6 +78,7 @@ public class TestAuthnTokenReacquireOpen {
asyncClient = harness.getAsyncClient();
}
+ @SuppressWarnings("FutureReturnValueIgnored")
private void dropConnections() {
for (Connection c : asyncClient.getConnectionListCopy()) {
c.disconnect();
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
index 492b5a3..4b1d4a3 100644
---
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
+++
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
@@ -17,13 +17,13 @@
package org.apache.kudu.client;
-import java.util.List;
-
import static org.apache.kudu.consensus.Metadata.RaftPeerPB.Role.FOLLOWER;
import static org.apache.kudu.consensus.Metadata.RaftPeerPB.Role.LEADER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import java.util.List;
+
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.stumbleupon.async.Callback;
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
index f202435..19ebfc3 100644
---
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
+++
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
@@ -37,6 +37,7 @@ public class TestConnectionCache {
public RetryRule retryRule = new RetryRule();
@Test(timeout = 50000)
+ @SuppressWarnings("FutureReturnValueIgnored")
public void test() throws Exception {
try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
.numMasterServers(3)
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
index c60578d..d73bb44 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
@@ -37,11 +37,14 @@ import javax.net.ssl.SSLException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.Message;
import com.google.protobuf.TextFormat;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
-import org.jboss.netty.handler.ssl.SslHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.ssl.SslHandler;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -49,8 +52,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kudu.client.Negotiator.Success;
+import org.apache.kudu.rpc.RpcHeader;
import org.apache.kudu.rpc.RpcHeader.AuthenticationTypePB;
-import org.apache.kudu.rpc.RpcHeader.ConnectionContextPB;
import org.apache.kudu.rpc.RpcHeader.NegotiatePB;
import org.apache.kudu.rpc.RpcHeader.NegotiatePB.NegotiateStep;
import org.apache.kudu.rpc.RpcHeader.NegotiatePB.SaslMechanism;
@@ -63,7 +66,7 @@ import org.apache.kudu.util.SecurityUtil;
public class TestNegotiator {
static final Logger LOG = LoggerFactory.getLogger(TestNegotiator.class);
- private DecoderEmbedder<Object> embedder;
+ private EmbeddedChannel embedder;
private SecurityContext secContext;
private SSLEngine serverEngine;
@@ -107,19 +110,21 @@ public class TestNegotiator {
private void startNegotiation(boolean fakeLoopback) {
Negotiator negotiator = new Negotiator("127.0.0.1", secContext, false);
negotiator.overrideLoopbackForTests = fakeLoopback;
- embedder = new DecoderEmbedder<>(negotiator);
- negotiator.sendHello(embedder.getPipeline().getChannel());
+ embedder = new EmbeddedChannel(negotiator);
+ negotiator.sendHello(embedder.pipeline().firstContext());
}
static CallResponse fakeResponse(ResponseHeader header, Message body) {
- ChannelBuffer buf = KuduRpc.toChannelBuffer(header, body);
+ ByteBuf buf = Unpooled.buffer();
+ KuduRpc.toByteBuf(buf, header, body);
buf = buf.slice(4, buf.readableBytes() - 4);
return new CallResponse(buf);
}
KeyStore loadTestKeystore() throws Exception {
KeyStore ks = KeyStore.getInstance("JKS");
- try (InputStream stream =
TestNegotiator.class.getResourceAsStream("/test-key-and-cert.jks")) {
+ try (InputStream stream =
+
TestNegotiator.class.getResourceAsStream("/test-key-and-cert.jks")) {
ks.load(stream, KEYSTORE_PASSWORD);
}
return ks;
@@ -142,14 +147,17 @@ public class TestNegotiator {
* a Negotiation.Success to the pipeline.
* @return the result
*/
- private Success assertComplete() {
- RpcOutboundMessage msg = (RpcOutboundMessage)embedder.poll();
- ConnectionContextPB connCtx = (ConnectionContextPB)msg.getBody();
+ private Success assertComplete(boolean isTls) throws Exception {
+ RpcOutboundMessage msg = isTls ?
+ unwrapOutboundMessage(embedder.readOutbound(),
+ RpcHeader.ConnectionContextPB.newBuilder()) :
+ embedder.readOutbound();
+ RpcHeader.ConnectionContextPB connCtx = (RpcHeader.ConnectionContextPB)
msg.getBody();
assertEquals(Negotiator.CONNECTION_CTX_CALL_ID,
msg.getHeaderBuilder().getCallId());
assertEquals(System.getProperty("user.name"),
connCtx.getDEPRECATEDUserInfo().getRealUser());
// Expect the client to also emit a negotiation Success.
- Success success = (Success)embedder.poll();
+ Success success = embedder.readInbound();
assertNotNull(success);
return success;
}
@@ -166,25 +174,26 @@ public class TestNegotiator {
* Simple test case for a PLAIN negotiation.
*/
@Test
- public void testNegotiation() {
+ public void testNegotiation() throws Exception {
startNegotiation(false);
// Expect client->server: NEGOTIATE.
- RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+ RpcOutboundMessage msg = embedder.readOutbound();
NegotiatePB body = (NegotiatePB) msg.getBody();
assertEquals(Negotiator.SASL_CALL_ID, msg.getHeaderBuilder().getCallId());
assertEquals(NegotiateStep.NEGOTIATE, body.getStep());
// Respond with NEGOTIATE.
- embedder.offer(fakeResponse(
+ embedder.writeInbound(fakeResponse(
ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
NegotiatePB.newBuilder()
.addSaslMechanisms(SaslMechanism.newBuilder().setMechanism("PLAIN"))
.setStep(NegotiateStep.NEGOTIATE)
.build()));
+ embedder.flushInbound();
// Expect client->server: SASL_INITIATE (PLAIN)
- msg = (RpcOutboundMessage)embedder.poll();
+ msg = embedder.readOutbound();
body = (NegotiatePB) msg.getBody();
assertEquals(Negotiator.SASL_CALL_ID, msg.getHeaderBuilder().getCallId());
@@ -194,14 +203,15 @@ public class TestNegotiator {
assertTrue(body.hasToken());
// Respond with SASL_SUCCESS:
- embedder.offer(fakeResponse(
+ embedder.writeInbound(fakeResponse(
ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
NegotiatePB.newBuilder()
.setStep(NegotiateStep.SASL_SUCCESS)
.build()));
+ embedder.flushInbound();
// Expect client->server: ConnectionContext
- assertComplete();
+ assertComplete(/*isTls*/ false);
}
private static void runTasks(SSLEngineResult result,
@@ -253,21 +263,30 @@ public class TestNegotiator {
* Completes the 3-step TLS handshake, assuming that the client is
* about to generate the first of the messages.
*/
- private void runTlsHandshake() throws SSLException {
- RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+ private void runTlsHandshake(boolean isAuthOnly) throws SSLException {
+ RpcOutboundMessage msg = embedder.readOutbound();
NegotiatePB body = (NegotiatePB) msg.getBody();
assertEquals(NegotiateStep.TLS_HANDSHAKE, body.getStep());
// Consume the ClientHello in our fake server, which should generate
ServerHello.
- embedder.offer(runServerStep(serverEngine, body.getTlsHandshake()));
+ embedder.writeInbound(runServerStep(serverEngine, body.getTlsHandshake()));
+ embedder.flushInbound();
// Expect client to generate ClientKeyExchange, ChangeCipherSpec, Finished.
- msg = (RpcOutboundMessage) embedder.poll();
+ msg = embedder.readOutbound();
body = (NegotiatePB) msg.getBody();
assertEquals(NegotiateStep.TLS_HANDSHAKE, body.getStep());
+ // Now that the handshake is complete, we need to encode
RpcOutboundMessages
+ // to ByteBuf to be accepted by the the SslHandler.
+ // This encoder is added to the pipeline by the Connection in normal
Negotiator usage.
+ if (!isAuthOnly) {
+ embedder.pipeline().addFirst("encode-outbound", new
RpcOutboundMessage.Encoder());
+ }
+
// Server consumes the above. Should send the TLS "Finished" message.
- embedder.offer(runServerStep(serverEngine, body.getTlsHandshake()));
+ embedder.writeInbound(runServerStep(serverEngine, body.getTlsHandshake()));
+ embedder.flushInbound();
}
@Test
@@ -275,32 +294,29 @@ public class TestNegotiator {
startNegotiation(false);
// Expect client->server: NEGOTIATE, TLS included.
- RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+ RpcOutboundMessage msg = embedder.readOutbound();
NegotiatePB body = (NegotiatePB) msg.getBody();
assertEquals(NegotiateStep.NEGOTIATE, body.getStep());
assertTrue(body.getSupportedFeaturesList().contains(RpcFeatureFlag.TLS));
// Fake a server response with TLS enabled.
- embedder.offer(fakeResponse(
+ embedder.writeInbound(fakeResponse(
ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
NegotiatePB.newBuilder()
.addSaslMechanisms(NegotiatePB.SaslMechanism.newBuilder().setMechanism("PLAIN"))
.addSupportedFeatures(RpcFeatureFlag.TLS)
.setStep(NegotiateStep.NEGOTIATE)
.build()));
+ embedder.flushInbound();
// Expect client->server: TLS_HANDSHAKE.
- runTlsHandshake();
+ runTlsHandshake(/*isAuthOnly*/ false);
// The pipeline should now have an SSL handler as the first handler.
- assertTrue(embedder.getPipeline().getFirst() instanceof SslHandler);
+ assertTrue(embedder.pipeline().first() instanceof SslHandler);
// The Negotiator should have sent the SASL_INITIATE at this point.
- // NOTE: in a non-mock environment, this message would now be encrypted
- // by the newly-added TLS handler. But, with the DecoderEmbedder that we're
- // using, we don't actually end up processing outbound events. Upgrading
- // to Netty 4 and using EmbeddedChannel instead would make this more
realistic.
- msg = (RpcOutboundMessage) embedder.poll();
+ msg = unwrapOutboundMessage(embedder.readOutbound(),
RpcHeader.NegotiatePB.newBuilder());
body = (NegotiatePB) msg.getBody();
assertEquals(NegotiateStep.SASL_INITIATE, body.getStep());
}
@@ -310,7 +326,7 @@ public class TestNegotiator {
startNegotiation(true);
// Expect client->server: NEGOTIATE, TLS and TLS_AUTHENTICATION_ONLY
included.
- RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+ RpcOutboundMessage msg = embedder.readOutbound();
NegotiatePB body = (NegotiatePB) msg.getBody();
assertEquals(NegotiateStep.NEGOTIATE, body.getStep());
assertTrue(body.getSupportedFeaturesList().contains(RpcFeatureFlag.TLS));
@@ -318,7 +334,7 @@ public class TestNegotiator {
RpcFeatureFlag.TLS_AUTHENTICATION_ONLY));
// Fake a server response with TLS and TLS_AUTHENTICATION_ONLY enabled.
- embedder.offer(fakeResponse(
+ embedder.writeInbound(fakeResponse(
ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
NegotiatePB.newBuilder()
.addSaslMechanisms(NegotiatePB.SaslMechanism.newBuilder().setMechanism("PLAIN"))
@@ -326,16 +342,17 @@ public class TestNegotiator {
.addSupportedFeatures(RpcFeatureFlag.TLS_AUTHENTICATION_ONLY)
.setStep(NegotiateStep.NEGOTIATE)
.build()));
+ embedder.flushInbound();
// Expect client->server: TLS_HANDSHAKE.
- runTlsHandshake();
+ runTlsHandshake(/*isAuthOnly*/ true);
// The pipeline should *not* have an SSL handler as the first handler,
// since we used TLS for authentication only.
- assertFalse(embedder.getPipeline().getFirst() instanceof SslHandler);
+ assertFalse(embedder.pipeline().first() instanceof SslHandler);
// The Negotiator should have sent the SASL_INITIATE at this point.
- msg = (RpcOutboundMessage) embedder.poll();
+ msg = embedder.readOutbound();
body = (NegotiatePB) msg.getBody();
assertEquals(NegotiateStep.SASL_INITIATE, body.getStep());
}
@@ -350,7 +367,7 @@ public class TestNegotiator {
startNegotiation(false);
// Expect client->server: NEGOTIATE, TLS included, Token not included.
- RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+ RpcOutboundMessage msg = embedder.readOutbound();
NegotiatePB body = (NegotiatePB) msg.getBody();
assertEquals("supported_features: APPLICATION_FEATURE_FLAGS " +
"supported_features: TLS " +
@@ -369,7 +386,7 @@ public class TestNegotiator {
startNegotiation(false);
// Expect client->server: NEGOTIATE, TLS included, Token included.
- RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+ RpcOutboundMessage msg = embedder.readOutbound();
NegotiatePB body = (NegotiatePB) msg.getBody();
assertEquals("supported_features: APPLICATION_FEATURE_FLAGS " +
"supported_features: TLS " +
@@ -378,7 +395,7 @@ public class TestNegotiator {
"authn_types { token { } }", TextFormat.shortDebugString(body));
// Fake a server response with TLS enabled and TOKEN chosen.
- embedder.offer(fakeResponse(
+ embedder.writeInbound(fakeResponse(
ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
NegotiatePB.newBuilder()
.addSupportedFeatures(RpcFeatureFlag.TLS)
@@ -386,24 +403,59 @@ public class TestNegotiator {
AuthenticationTypePB.Token.getDefaultInstance()))
.setStep(NegotiateStep.NEGOTIATE)
.build()));
+ embedder.flushInbound();
// Expect to now run the TLS handshake
- runTlsHandshake();
+ runTlsHandshake(/*isAuthOnly*/ false);
// Expect the client to send the token.
- msg = (RpcOutboundMessage) embedder.poll();
+ msg = unwrapOutboundMessage(embedder.readOutbound(),
RpcHeader.NegotiatePB.newBuilder());
body = (NegotiatePB) msg.getBody();
assertEquals("step: TOKEN_EXCHANGE authn_token { }",
TextFormat.shortDebugString(body));
// Fake a response indicating success.
- embedder.offer(fakeResponse(
+ embedder.writeInbound(fakeResponse(
ResponseHeader.newBuilder().setCallId(Negotiator.SASL_CALL_ID).build(),
NegotiatePB.newBuilder()
.setStep(NegotiateStep.TOKEN_EXCHANGE)
.build()));
+ embedder.flushInbound();
+
+ // TODO (ghenke): For some reason the SslHandler adds an extra empty
message here.
+ // This should be harmless, but it would be good to understand or fix why.
+ ByteBuf empty = embedder.readOutbound();
+ assertEquals(0, empty.readableBytes());
// Should be complete now.
- assertComplete();
+ assertComplete(/*isTls*/ true);
+ }
+
+ private RpcOutboundMessage unwrapOutboundMessage(ByteBuf wrappedBuf,
+ Message.Builder
requestBuilder)
+ throws Exception {
+ // Create an SSL handle to handle unwrapping the ssl message.
+ SslHandler handler = new SslHandler(serverEngine);
+ EmbeddedChannel serverSSLChannel = new EmbeddedChannel(handler);
+
+ // Pass the ssl message through the channel with the ssl handler.
+ serverSSLChannel.writeInbound(wrappedBuf);
+ serverSSLChannel.flushInbound();
+ ByteBuf unwrappedbuf = serverSSLChannel.readInbound();
+
+ // Read the message size and bytes.
+ final int size = unwrappedbuf.readInt();
+ final byte [] bytes = new byte[size];
+ unwrappedbuf.getBytes(unwrappedbuf.readerIndex(), bytes);
+
+ // Parse the message header.
+ final CodedInputStream in = CodedInputStream.newInstance(bytes);
+ RpcHeader.RequestHeader.Builder header =
RpcHeader.RequestHeader.newBuilder();
+ in.readMessage(header, ExtensionRegistry.getEmptyRegistry());
+
+ // Parse the request message.
+ in.readMessage(requestBuilder, ExtensionRegistry.getEmptyRegistry());
+
+ return new RpcOutboundMessage(header, requestBuilder.build());
}
}