This is an automated email from the ASF dual-hosted git repository.
luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 7473c89 DRILL-7790 : Build Drill with Netty version 4.1.59.Final
7473c89 is described below
commit 7473c890284f8482032043a76789388e3a568439
Author: Rymar Maksym <[email protected]>
AuthorDate: Thu Feb 25 20:32:11 2021 +0200
DRILL-7790 : Build Drill with Netty version 4.1.59.Final
---
.../drill/common/collections/MapWithOrdinal.java | 8 +-
.../java/org/apache/drill/exec/record/DeadBuf.java | 225 ++++++++++++++++-----
.../drill/exec/rpc/UserClientConnection.java | 6 +-
.../org/apache/drill/exec/rpc/user/UserServer.java | 11 +-
.../exec/server/rest/BaseWebUserConnection.java | 5 +-
.../drill/exec/server/rest/DrillRestServer.java | 12 +-
.../exec/server/rest/WebSessionResources.java | 8 +-
.../drill/exec/server/rest/WebUserConnection.java | 8 +-
.../apache/drill/exec/work/foreman/Foreman.java | 5 +-
.../work/prepare/PreparedStatementProvider.java | 6 +-
.../drill/exec/server/rest/RestServerTest.java | 5 +-
.../exec/server/rest/WebSessionResourcesTest.java | 17 +-
exec/jdbc-all/pom.xml | 4 +-
.../src/main/java/io/netty/buffer/DrillBuf.java | 141 ++++++++++++-
.../io/netty/buffer/MutableWrappedByteBuf.java | 69 ++++++-
.../io/netty/buffer/PooledByteBufAllocatorL.java | 14 +-
.../io/netty/buffer/UnsafeDirectLittleEndian.java | 2 +-
.../drill/exec/memory/DrillByteBufAllocator.java | 37 ++--
pom.xml | 6 +-
19 files changed, 443 insertions(+), 146 deletions(-)
diff --git
a/common/src/main/java/org/apache/drill/common/collections/MapWithOrdinal.java
b/common/src/main/java/org/apache/drill/common/collections/MapWithOrdinal.java
index 66e04ac..458d833 100644
---
a/common/src/main/java/org/apache/drill/common/collections/MapWithOrdinal.java
+++
b/common/src/main/java/org/apache/drill/common/collections/MapWithOrdinal.java
@@ -29,7 +29,6 @@ import
org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import io.netty.util.collection.IntObjectHashMap;
-import io.netty.util.collection.IntObjectMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,12 +128,7 @@ public class MapWithOrdinal<K, V> implements Map<K, V> {
@Override
public Collection<V> values() {
- return Lists.newArrayList(Iterables.transform(secondary.entries(), new
Function<IntObjectMap.Entry<V>, V>() {
- @Override
- public V apply(IntObjectMap.Entry<V> entry) {
- return Preconditions.checkNotNull(entry).value();
- }
- }));
+ return Lists.newArrayList(Iterables.transform(secondary.entries(), entry
-> Preconditions.checkNotNull(entry).value()));
}
@Override
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
index 271da41..c3c3539 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.record;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.ByteBufProcessor;
+import io.netty.util.ByteProcessor;
import io.netty.buffer.DrillBuf;
import java.io.IOException;
@@ -27,6 +27,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
@@ -222,36 +223,76 @@ public class DeadBuf extends ByteBuf {
}
@Override
+ public short getShortLE(int index) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public int getUnsignedShort(int index) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public int getUnsignedShortLE(int index) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public int getMedium(int index) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public int getMediumLE(int index) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public int getUnsignedMedium(int index) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public int getUnsignedMediumLE(int index) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public int getInt(int index) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public int getIntLE(int index) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public long getUnsignedInt(int index) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public long getUnsignedIntLE(int index) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public long getLong(int index) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public long getLongLE(int index) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public CharSequence getCharSequence(int index, int length, Charset charset) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public char getChar(int index) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@@ -267,6 +308,11 @@ public class DeadBuf extends ByteBuf {
}
@Override
+ public int getBytes(int index, FileChannel out, long position, int length) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public ByteBuf getBytes(int index, ByteBuf dst) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@@ -322,21 +368,46 @@ public class DeadBuf extends ByteBuf {
}
@Override
+ public ByteBuf setShortLE(int index, int value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public ByteBuf setMedium(int index, int value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public ByteBuf setMediumLE(int index, int value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public ByteBuf setInt(int index, int value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public ByteBuf setIntLE(int index, int value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public ByteBuf setLong(int index, long value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
+ public ByteBuf setLongLE(int index, long value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public int setCharSequence(int index, CharSequence sequence, Charset
charset) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public ByteBuf setChar(int index, int value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@@ -352,6 +423,11 @@ public class DeadBuf extends ByteBuf {
}
@Override
+ public int setBytes(int index, FileChannel in, long position, int length) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public ByteBuf setBytes(int index, ByteBuf src) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@@ -418,7 +494,11 @@ public class DeadBuf extends ByteBuf {
@Override
public short readShort() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public short readShortLE() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
@@ -428,39 +508,68 @@ public class DeadBuf extends ByteBuf {
}
@Override
+ public int readUnsignedShortLE() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
public int readMedium() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public int readMediumLE() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public int readUnsignedMedium() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public int readUnsignedMediumLE() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public int readInt() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public int readIntLE() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public long readUnsignedInt() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public long readUnsignedIntLE() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public long readLong() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public long readLongLE() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public char readChar() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public CharSequence readCharSequence(int length, Charset charset) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
@@ -472,310 +581,314 @@ public class DeadBuf extends ByteBuf {
@Override
public double readDouble() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public int readBytes(FileChannel out, long position, int length) throws
IOException {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public ByteBuf readBytes(int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf readSlice(int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf readBytes(ByteBuf dst) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf readBytes(ByteBuf dst, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf readBytes(byte[] dst) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf readBytes(ByteBuffer dst) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf readBytes(OutputStream out, int length) throws IOException {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public int readBytes(GatheringByteChannel out, int length) throws
IOException {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf skipBytes(int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeBoolean(boolean value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeByte(int value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeShort(int value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public ByteBuf writeShortLE(int value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public ByteBuf writeMedium(int value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public ByteBuf writeMediumLE(int value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public ByteBuf writeInt(int value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public ByteBuf writeIntLE(int value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public ByteBuf writeLong(long value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public ByteBuf writeLongLE(long value) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public ByteBuf writeChar(int value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeFloat(float value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeDouble(double value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public int writeBytes(FileChannel in, long position, int length) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
- public ByteBuf writeBytes(ByteBuf src) {
+ public int writeCharSequence(CharSequence sequence, Charset charset) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public ByteBuf writeBytes(ByteBuf src) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public ByteBuf writeBytes(ByteBuf src, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeBytes(byte[] src) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeBytes(ByteBuffer src) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public int writeBytes(InputStream in, int length) throws IOException {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public int writeBytes(ScatteringByteChannel in, int length) throws
IOException {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf writeZero(int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public int indexOf(int fromIndex, int toIndex, byte value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public int bytesBefore(byte value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public int bytesBefore(int length, byte value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public int bytesBefore(int index, int length, byte value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf copy() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuf copy(int index, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public ByteBuf readRetainedSlice(int length) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public ByteBuf slice() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public ByteBuf retainedSlice() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public ByteBuf slice(int index, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public ByteBuf retainedSlice(int index, int length) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
- public ByteBuf duplicate() {
+ public ByteBuf retainedDuplicate() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ @Override
+ public ByteBuf duplicate() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public int nioBufferCount() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuffer nioBuffer() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuffer nioBuffer(int index, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuffer[] nioBuffers() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public ByteBuffer[] nioBuffers(int index, int length) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public boolean hasArray() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public byte[] array() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public int arrayOffset() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public boolean hasMemoryAddress() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public long memoryAddress() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public String toString(Charset charset) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
public String toString(int index, int length, Charset charset) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
-
}
@Override
@@ -793,33 +906,51 @@ public class DeadBuf extends ByteBuf {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
+ @Override
+ public boolean isReadOnly() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+
+ @Override
+ public ByteBuf asReadOnly() {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
@Override
public boolean equals(Object arg0) {
return false;
}
+ @Override
+ public ByteBuf touch() {
+ return this;
+ }
+
+ @Override
+ public ByteBuf touch(Object hint) {
+ return this;
+ }
@Override
- public int forEachByte(ByteBufProcessor arg0) {
+ public int forEachByte(ByteProcessor arg0) {
return 0;
}
@Override
- public int forEachByte(int arg0, int arg1, ByteBufProcessor arg2) {
+ public int forEachByte(int arg0, int arg1, ByteProcessor arg2) {
return 0;
}
@Override
- public int forEachByteDesc(ByteBufProcessor arg0) {
+ public int forEachByteDesc(ByteProcessor arg0) {
return 0;
}
@Override
- public int forEachByteDesc(int arg0, int arg1, ByteBufProcessor arg2) {
+ public int forEachByteDesc(int arg0, int arg1, ByteProcessor arg2) {
return 0;
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
index 179cc7c..85d14dc 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.rpc;
-import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -60,10 +60,10 @@ public interface UserClientConnection {
void sendData(RpcOutcomeListener<Ack> listener, QueryDataPackage data);
/**
- * Returns the {@link ChannelFuture} which will be notified when this
+ * Returns the {@link Future} which will be notified when this
* channel is closed. This method always returns the same future instance.
*/
- ChannelFuture getChannelClosureFuture();
+ Future<Void> getClosureFuture();
/**
* @return Return the client node address.
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index cb1db13..892636c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -19,14 +19,12 @@ package org.apache.drill.exec.rpc.user;
import com.google.protobuf.MessageLite;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.common.config.DrillProperties;
import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.exec.exception.DrillbitStartupException;
@@ -284,14 +282,9 @@ public class UserServer extends BasicServer<RpcType,
BitToUserConnection> {
}
@Override
- public ChannelFuture getChannelClosureFuture() {
+ public Future<Void> getClosureFuture() {
return getChannel().closeFuture()
- .addListener(new GenericFutureListener<Future<? super Void>>() {
- @Override
- public void operationComplete(Future<? super Void> future) throws
Exception {
- cleanup();
- }
- });
+ .addListener(future -> cleanup());
}
@Override
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java
index f847323..c04a75d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java
@@ -19,13 +19,12 @@ package org.apache.drill.exec.server.rest;
import java.net.SocketAddress;
+import io.netty.util.concurrent.Future;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.rpc.AbstractDisposableUserClientConnection;
import org.apache.drill.exec.rpc.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.UserSession;
-import io.netty.channel.ChannelFuture;
-
public abstract class BaseWebUserConnection extends
AbstractDisposableUserClientConnection implements ConnectionThrottle {
protected WebSessionResources webSessionResources;
@@ -40,7 +39,7 @@ public abstract class BaseWebUserConnection extends
AbstractDisposableUserClient
}
@Override
- public ChannelFuture getChannelClosureFuture() {
+ public Future<Void> getClosureFuture() {
return webSessionResources.getCloseFuture();
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index c345571..8473d64 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -29,8 +29,8 @@ import freemarker.cache.TemplateLoader;
import freemarker.cache.WebappTemplateLoader;
import freemarker.core.HTMLOutputFormat;
import freemarker.template.Configuration;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.DefaultPromise;
+import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.EventExecutor;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
@@ -221,11 +221,11 @@ public class DrillRestServer extends ResourceConfig {
config.getLong(ExecConstants.HTTP_SESSION_MEMORY_RESERVATION),
config.getLong(ExecConstants.HTTP_SESSION_MEMORY_MAXIMUM));
- // Create a dummy close future which is needed by Foreman only.
Foreman uses this future to add a close
+ // Create a future which is needed by Foreman only. Foreman uses this
future to add a close
// listener to known about channel close event from underlying layer.
We use this future to notify Foreman
// listeners when the Web session (not connection) between Web Client
and WebServer is closed. This will help
// Foreman to cancel all the running queries for this Web Client.
- final ChannelPromise closeFuture = new DefaultChannelPromise(null,
executor);
+ final Promise<Void> closeFuture = new DefaultPromise<>(executor);
// Create a WebSessionResource instance which owns the lifecycle of
all the session resources.
// Set this instance as an attribute of HttpSession, since it will be
used until session is destroyed
@@ -283,12 +283,12 @@ public class DrillRestServer extends ResourceConfig {
logger.trace("Failed to get the remote address of the http session
request", ex);
}
- // Create a dummy close future which is needed by Foreman only. Foreman
uses this future to add a close
+ // Create a close future which is needed by Foreman only. Foreman uses
this future to add a close
// listener to known about channel close event from underlying layer.
//
// The invocation of this close future is no-op as it will be triggered
after query completion in unsecure case.
// But we need this close future as it's expected by Foreman.
- final ChannelPromise closeFuture = new DefaultChannelPromise(null,
executor);
+ final Promise<Void> closeFuture = new DefaultPromise(executor);
final WebSessionResources webSessionResources = new
WebSessionResources(sessionAllocator, remoteAddress,
drillUserSession, closeFuture);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
index c06770e..e678923 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.server.rest;
-import io.netty.channel.ChannelPromise;
+import io.netty.util.concurrent.Promise;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.rpc.ChannelClosedException;
@@ -40,10 +40,10 @@ public class WebSessionResources implements AutoCloseable {
private final UserSession webUserSession;
- private ChannelPromise closeFuture;
+ private Promise<Void> closeFuture;
WebSessionResources(BufferAllocator allocator, SocketAddress remoteAddress,
- UserSession userSession, ChannelPromise closeFuture) {
+ UserSession userSession, Promise<Void> closeFuture) {
this.allocator = allocator;
this.remoteAddress = remoteAddress;
this.webUserSession = userSession;
@@ -58,7 +58,7 @@ public class WebSessionResources implements AutoCloseable {
return allocator;
}
- public ChannelPromise getCloseFuture() {
+ public Promise<Void> getCloseFuture() {
return closeFuture;
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
index c81e5b6..5e5f57e 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
@@ -45,12 +45,8 @@ import java.util.Set;
* access to the {@code UserSession} executing the query. There is no actual
physical
* channel corresponding to this connection wrapper.
*
- * It returns a close future with no actual underlying
- * {@link io.netty.channel.Channel} associated with it but do have an
- * {@code EventExecutor} out of BitServer EventLoopGroup. Since there is no
actual
- * connection established using this class, hence the close event will never be
- * fired by underlying layer and close future is set only when the
- * {@link WebSessionResources} are closed.
+ * It returns a close future which do have an EventExecutor out of BitServer
EventLoopGroup.
+ * Close future is set only when the {@link WebSessionResources} are closed.
*/
public class WebUserConnection extends BaseWebUserConnection {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 38b85b6..64df6c4 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -22,7 +22,6 @@ import org.apache.drill.exec.work.filter.RuntimeFilterRouter;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
-import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -116,7 +115,7 @@ public class Foreman implements Runnable {
private final ResponseSendListener responseListener = new
ResponseSendListener();
private final GenericFutureListener<Future<Void>> closeListener = future ->
cancel();
- private final ChannelFuture closeFuture;
+ private final Future<Void> closeFuture;
private final FragmentsRunner fragmentsRunner;
private final QueryStateProcessor queryStateProcessor;
@@ -141,7 +140,7 @@ public class Foreman implements Runnable {
this.queryRequest = queryRequest;
this.drillbitContext = drillbitContext;
this.initiatingClient = connection;
- this.closeFuture = initiatingClient.getChannelClosureFuture();
+ this.closeFuture = initiatingClient.getClosureFuture();
closeFuture.addListener(closeListener);
// Apply AutoLimit on resultSet (Usually received via REST APIs)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
index 5de560a..9368a54 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.work.prepare;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
-import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
import org.apache.drill.common.exceptions.ErrorHelper;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
@@ -244,8 +244,8 @@ public class PreparedStatementProvider {
}
@Override
- public ChannelFuture getChannelClosureFuture() {
- return inner.getChannelClosureFuture();
+ public Future<Void> getClosureFuture() {
+ return inner.getClosureFuture();
}
@Override
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/RestServerTest.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/RestServerTest.java
index 0e9e48d..0c5d3c9 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/RestServerTest.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/RestServerTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.server.rest;
-import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.Promise;
import io.netty.channel.local.LocalAddress;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
@@ -30,6 +30,7 @@ import
org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.test.ClusterTest;
+import org.mockito.Mockito;
public class RestServerTest extends ClusterTest {
@@ -55,7 +56,7 @@ public class RestServerTest extends ClusterTest {
.withOptionManager(systemOptions)
.withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName(principal.getName()).build())
.build(),
- new DefaultChannelPromise(null));
+ Mockito.mock(Promise.class));
WebUserConnection connection = new
WebUserConnection.AnonWebUserConnection(webSessionResources);
return new RestQueryRunner(q, cluster.drillbit().getManager(),
connection).run();
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java
index 69fa942..56eb84b 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/WebSessionResourcesTest.java
@@ -17,8 +17,8 @@
*/
package org.apache.drill.exec.server.rest;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
@@ -33,7 +33,6 @@ import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -65,15 +64,13 @@ public class WebSessionResourcesTest extends BaseTest {
@Test
public void testChannelPromiseWithNullExecutor() throws Exception {
try {
- ChannelPromise closeFuture = new DefaultChannelPromise(null);
+ Promise<Void> closeFuture = new DefaultPromise(null);
webSessionResources = new
WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock
(UserSession.class), closeFuture);
webSessionResources.close();
fail();
} catch (Exception e) {
assertTrue(e instanceof NullPointerException);
- verify(webSessionResources.getAllocator()).close();
- verify(webSessionResources.getSession()).close();
}
}
@@ -85,14 +82,12 @@ public class WebSessionResourcesTest extends BaseTest {
public void testChannelPromiseWithValidExecutor() throws Exception {
try {
EventExecutor mockExecutor = mock(EventExecutor.class);
- ChannelPromise closeFuture = new DefaultChannelPromise(null,
mockExecutor);
+ Promise<Void> closeFuture = new DefaultPromise(mockExecutor);
webSessionResources = new
WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock
(UserSession.class), closeFuture);
webSessionResources.close();
verify(webSessionResources.getAllocator()).close();
verify(webSessionResources.getSession()).close();
- verify(mockExecutor).inEventLoop();
- verify(mockExecutor).execute(any(Runnable.class));
assertTrue(webSessionResources.getCloseFuture() == null);
assertTrue(!listenerComplete);
} catch (Exception e) {
@@ -107,7 +102,7 @@ public class WebSessionResourcesTest extends BaseTest {
@Test
public void testDoubleClose() throws Exception {
try {
- ChannelPromise closeFuture = new DefaultChannelPromise(null,
mock(EventExecutor.class));
+ Promise<Void> closeFuture = new
DefaultPromise(mock(EventExecutor.class));
webSessionResources = new
WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class), mock
(UserSession.class), closeFuture);
webSessionResources.close();
@@ -134,7 +129,7 @@ public class WebSessionResourcesTest extends BaseTest {
GenericFutureListener<Future<Void>> closeListener = new
TestClosedListener();
latch = new CountDownLatch(1);
executor = TransportCheck.createEventLoopGroup(1, "Test-Thread").next();
- ChannelPromise closeFuture = new DefaultChannelPromise(null, executor);
+ Promise<Void> closeFuture = new DefaultPromise(executor);
// create WebSessionResources with above ChannelPromise to notify
listener
webSessionResources = new
WebSessionResources(mock(BufferAllocator.class), mock(SocketAddress.class),
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index 3257d18..1d9e0a0 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -535,7 +535,7 @@
This is likely due to you adding new dependencies to a
java-exec and not updating the excludes in this module. This is important as it
minimizes the size of the dependency of Drill application users.
</message>
- <maxsize>46000000</maxsize>
+ <maxsize>46300000</maxsize>
<minsize>15000000</minsize>
<files>
<file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>
@@ -595,7 +595,7 @@
This is likely due to you adding new dependencies to
a java-exec and not updating the excludes in this module. This is important as
it minimizes the size of the dependency of Drill application users.
</message>
- <maxsize>46000000</maxsize>
+ <maxsize>46300000</maxsize>
<minsize>15000000</minsize>
<files>
<file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
index 9e90d06..c055344 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
@@ -471,12 +472,29 @@ public final class DrillBuf extends AbstractByteBuf
implements AutoCloseable {
}
@Override
+ public ByteBuf touch() {
+ return this;
+ }
+
+ @Override
+ public ByteBuf touch(Object hint) {
+ return this;
+ }
+
+ @Override
public long getLong(int index) {
chk(index, 8);
return PlatformDependent.getLong(addr(index));
}
@Override
+ public long getLongLE(int index) {
+ chk(index, 8);
+ final long var = PlatformDependent.getLong(addr(index));
+ return Long.reverseBytes(var);
+ }
+
+ @Override
public float getFloat(int index) {
return Float.intBitsToFloat(getInt(index));
}
@@ -503,6 +521,13 @@ public final class DrillBuf extends AbstractByteBuf
implements AutoCloseable {
}
@Override
+ public int getIntLE(int index) {
+ chk(index, 4);
+ final int var = PlatformDependent.getInt(addr(index));
+ return Integer.reverseBytes(var);
+ }
+
+ @Override
public int getUnsignedShort(int index) {
return getShort(index) & 0xFFFF;
}
@@ -514,6 +539,28 @@ public final class DrillBuf extends AbstractByteBuf
implements AutoCloseable {
}
@Override
+ public short getShortLE(int index) {
+ chk(index, 2);
+ final short var = PlatformDependent.getShort(addr(index));
+ return Short.reverseBytes(var);
+ }
+
+ @Override
+ public int getUnsignedMedium(int index) {
+ final long addr = addr(index);
+ return (PlatformDependent.getByte(addr) & 0xff) << 16 |
+ (PlatformDependent.getByte(addr + 1) & 0xff) << 8 |
+ PlatformDependent.getByte(addr + 2) & 0xff;
+ }
+
+ @Override
+ public int getUnsignedMediumLE(int index) {
+ final long addr = this.addr(index);
+ return PlatformDependent.getByte(addr) & 255 |
+ (Short.reverseBytes(PlatformDependent.getShort(addr + 1L)) &
'\uffff') << 8;
+ }
+
+ @Override
public ByteBuf setShort(int index, int value) {
chk(index, 2);
PlatformDependent.putShort(addr(index), (short) value);
@@ -521,6 +568,31 @@ public final class DrillBuf extends AbstractByteBuf
implements AutoCloseable {
}
@Override
+ public ByteBuf setShortLE(int index, int value) {
+ chk(index, 2);
+ PlatformDependent.putShort(addr(index), Short.reverseBytes((short)value));
+ return this;
+ }
+
+ @Override
+ public ByteBuf setMedium(int index, int value) {
+ chk(index, 3);
+ long addr = this.addr(index);
+ PlatformDependent.putByte(addr, (byte)(value >>> 16));
+ PlatformDependent.putShort(addr + 1L, (short)value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setMediumLE(int index, int value) {
+ chk(index, 3);
+ long addr = this.addr(index);
+ PlatformDependent.putByte(addr, (byte)value);
+ PlatformDependent.putShort(addr + 1L, Short.reverseBytes((short)(value >>>
8)));
+ return this;
+ }
+
+ @Override
public ByteBuf setInt(int index, int value) {
chk(index, 4);
PlatformDependent.putInt(addr(index), value);
@@ -528,6 +600,13 @@ public final class DrillBuf extends AbstractByteBuf
implements AutoCloseable {
}
@Override
+ public ByteBuf setIntLE(int index, int value) {
+ chk(index, 4);
+ PlatformDependent.putInt(addr(index), Integer.reverseBytes(value));
+ return this;
+ }
+
+ @Override
public ByteBuf setLong(int index, long value) {
chk(index, 8);
PlatformDependent.putLong(addr(index), value);
@@ -535,6 +614,13 @@ public final class DrillBuf extends AbstractByteBuf
implements AutoCloseable {
}
@Override
+ public ByteBuf setLongLE(int index, long value) {
+ chk(index, 4);
+ PlatformDependent.putLong(addr(index), Long.reverseBytes(value));
+ return this;
+ }
+
+ @Override
public ByteBuf setChar(int index, int value) {
chk(index, 2);
PlatformDependent.putShort(addr(index), (short) value);
@@ -643,16 +729,31 @@ public final class DrillBuf extends AbstractByteBuf
implements AutoCloseable {
}
@Override
+ protected short _getShortLE(int index) {
+ return getShortLE(index);
+ }
+
+ @Override
protected int _getInt(int index) {
return getInt(index);
}
@Override
+ protected int _getIntLE(int index) {
+ return getIntLE(index);
+ }
+
+ @Override
protected long _getLong(int index) {
return getLong(index);
}
@Override
+ protected long _getLongLE(int index) {
+ return getLongLE(index);
+ }
+
+ @Override
protected void _setByte(int index, int value) {
setByte(index, value);
}
@@ -663,21 +764,41 @@ public final class DrillBuf extends AbstractByteBuf
implements AutoCloseable {
}
@Override
+ protected void _setShortLE(int index, int value) {
+ setShortLE(index, value);
+ }
+
+ @Override
protected void _setMedium(int index, int value) {
setMedium(index, value);
}
@Override
+ protected void _setMediumLE(int index, int value) {
+ setMediumLE(index, value);
+ }
+
+ @Override
protected void _setInt(int index, int value) {
setInt(index, value);
}
@Override
+ protected void _setIntLE(int index, int value) {
+ setIntLE(index, value);
+ }
+
+ @Override
protected void _setLong(int index, long value) {
setLong(index, value);
}
@Override
+ protected void _setLongLE(int index, long value) {
+ setLongLE(index, value);
+ }
+
+ @Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
final int BULK_COPY_THR = 16;
// Performance profiling indicated that using the "putByte()" method is
faster for short
@@ -703,11 +824,18 @@ public final class DrillBuf extends AbstractByteBuf
implements AutoCloseable {
}
@Override
+ public int getBytes(int index, FileChannel out, long position, int length)
throws IOException {
+ return udle.getBytes(index + offset, out, position, length);
+ }
+
+ @Override
protected int _getUnsignedMedium(int index) {
- final long addr = addr(index);
- return (PlatformDependent.getByte(addr) & 0xff) << 16 |
- (PlatformDependent.getByte(addr + 1) & 0xff) << 8 |
- PlatformDependent.getByte(addr + 2) & 0xff;
+ return getUnsignedMedium(index);
+ }
+
+ @Override
+ protected int _getUnsignedMediumLE(int index) {
+ return getUnsignedMediumLE(index);
}
@Override
@@ -763,6 +891,11 @@ public final class DrillBuf extends AbstractByteBuf
implements AutoCloseable {
}
@Override
+ public int setBytes(int index, FileChannel in, long position, int length)
throws IOException {
+ return udle.setBytes(index + offset, in, position, length);
+ }
+
+ @Override
public byte getByte(int index) {
chk(index, 1);
return PlatformDependent.getByte(addr(index));
diff --git
a/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
b/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
index d69d17b..d29adfa 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
@@ -127,6 +128,11 @@ abstract class MutableWrappedByteBuf extends
AbstractByteBuf {
}
@Override
+ protected short _getShortLE(int index) {
+ return buffer.getShortLE(index);
+ }
+
+ @Override
public int getUnsignedMedium(int index) {
return _getUnsignedMedium(index);
}
@@ -137,6 +143,11 @@ abstract class MutableWrappedByteBuf extends
AbstractByteBuf {
}
@Override
+ protected int _getUnsignedMediumLE(int index) {
+ return buffer.getUnsignedMediumLE(index);
+ }
+
+ @Override
public int getInt(int index) {
return _getInt(index);
}
@@ -147,6 +158,11 @@ abstract class MutableWrappedByteBuf extends
AbstractByteBuf {
}
@Override
+ protected int _getIntLE(int index) {
+ return buffer.getIntLE(index);
+ }
+
+ @Override
public long getLong(int index) {
return _getLong(index);
}
@@ -157,6 +173,11 @@ abstract class MutableWrappedByteBuf extends
AbstractByteBuf {
}
@Override
+ protected long _getLongLE(int index) {
+ return buffer.getLongLE(index);
+ }
+
+ @Override
public abstract ByteBuf copy(int index, int length);
@Override
@@ -205,6 +226,11 @@ abstract class MutableWrappedByteBuf extends
AbstractByteBuf {
}
@Override
+ protected void _setShortLE(int index, int value) {
+ buffer.setShortLE(index, value);
+ }
+
+ @Override
public ByteBuf setMedium(int index, int value) {
_setMedium(index, value);
return this;
@@ -216,6 +242,11 @@ abstract class MutableWrappedByteBuf extends
AbstractByteBuf {
}
@Override
+ protected void _setMediumLE(int index, int value) {
+ buffer.setMediumLE(index, value);
+ }
+
+ @Override
public ByteBuf setInt(int index, int value) {
_setInt(index, value);
return this;
@@ -227,6 +258,11 @@ abstract class MutableWrappedByteBuf extends
AbstractByteBuf {
}
@Override
+ protected void _setIntLE(int index, int value) {
+ buffer.setIntLE(index, value);
+ }
+
+ @Override
public ByteBuf setLong(int index, long value) {
_setLong(index, value);
return this;
@@ -238,6 +274,11 @@ abstract class MutableWrappedByteBuf extends
AbstractByteBuf {
}
@Override
+ protected void _setLongLE(int index, long value) {
+ buffer.setLongLE(index, value);
+ }
+
+ @Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
buffer.setBytes(index, src, srcIndex, length);
return this;
@@ -269,6 +310,11 @@ abstract class MutableWrappedByteBuf extends
AbstractByteBuf {
}
@Override
+ public int getBytes(int index, FileChannel out, long position, int length)
throws IOException {
+ return buffer.getBytes(index, out, position, length);
+ }
+
+ @Override
public int setBytes(int index, InputStream in, int length)
throws IOException {
return buffer.setBytes(index, in, length);
@@ -281,6 +327,11 @@ abstract class MutableWrappedByteBuf extends
AbstractByteBuf {
}
@Override
+ public int setBytes(int index, FileChannel in, long position, int length)
throws IOException {
+ return buffer.setBytes(index, in, position, length);
+ }
+
+ @Override
public int nioBufferCount() {
return buffer.nioBufferCount();
}
@@ -296,23 +347,25 @@ abstract class MutableWrappedByteBuf extends
AbstractByteBuf {
}
@Override
- public int forEachByte(int index, int length, ByteBufProcessor processor) {
- return buffer.forEachByte(index, length, processor);
+ public final int refCnt() {
+ return unwrap().refCnt();
}
@Override
- public int forEachByteDesc(int index, int length, ByteBufProcessor
processor) {
- return buffer.forEachByteDesc(index, length, processor);
+ public final ByteBuf retain() {
+ unwrap().retain();
+ return this;
}
@Override
- public final int refCnt() {
- return unwrap().refCnt();
+ public ByteBuf touch() {
+ buffer.touch();
+ return this;
}
@Override
- public final ByteBuf retain() {
- unwrap().retain();
+ public ByteBuf touch(Object hint) {
+ buffer.touch(hint);
return this;
}
diff --git
a/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
b/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
index 1e70216..bee3beb 100644
---
a/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
+++
b/exec/memory/base/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -68,6 +68,14 @@ public class PooledByteBufAllocatorL {
}
}
+ public ByteBuf allocateHeap(int initialCapacity, int maxCapacity) {
+ try {
+ return allocator.heapBuffer(initialCapacity, maxCapacity);
+ } catch (OutOfMemoryError e) {
+ throw new OutOfMemoryException("Failure allocating heap buffer.", e);
+ }
+ }
+
public int getChunkSize() {
return allocator.chunkSize;
}
@@ -201,12 +209,6 @@ public class PooledByteBufAllocatorL {
return newDirectBufferL(initialCapacity, maxCapacity);
}
- @Override
- public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
- throw new UnsupportedOperationException(
- "Drill doesn't support using heap buffers.");
- }
-
private void validate(int initialCapacity, int maxCapacity) {
if (initialCapacity < 0) {
throw new IllegalArgumentException(
diff --git
a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
index f4d6b81..c6bc769 100644
---
a/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
+++
b/exec/memory/base/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -57,7 +57,7 @@ public final class UnsafeDirectLittleEndian extends
WrappedByteBuf {
private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake,
AtomicLong bufferCount, AtomicLong bufferSize) {
super(buf);
- if (!NATIVE_ORDER || buf.order() != ByteOrder.BIG_ENDIAN) {
+ if (!NATIVE_ORDER) {
throw new IllegalStateException("Drill only runs on LittleEndian
systems.");
}
diff --git
a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/DrillByteBufAllocator.java
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/DrillByteBufAllocator.java
index 33325a5..81efdf0 100644
---
a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/DrillByteBufAllocator.java
+++
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/DrillByteBufAllocator.java
@@ -17,22 +17,28 @@
*/
package org.apache.drill.exec.memory;
+import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.ExpandableByteBuf;
+import io.netty.buffer.PooledByteBufAllocatorL;
/**
- * An implementation of ByteBufAllocator that wraps a Drill BufferAllocator.
This allows the RPC layer to be accounted
+ * An extend of AbstractByteBufAllocator that wraps a Drill BufferAllocator.
This allows the RPC layer to be accounted
* and managed using Drill's BufferAllocator infrastructure. The only thin
different from a typical BufferAllocator is
* the signature and the fact that this Allocator returns ExpandableByteBufs
which enable otherwise non-expandable
* DrillBufs to be expandable.
+ *
+ * Beside it, DrillByteBufAllocator uses
PooledByteBufAllocatorL.InnerAllocator as allocator only for heapBuffer's for
+ * netty's purposes, when it directly calls heapBuffer methods.
*/
-public class DrillByteBufAllocator implements ByteBufAllocator {
+public class DrillByteBufAllocator extends AbstractByteBufAllocator {
private static final int DEFAULT_BUFFER_SIZE = 4096;
private static final int DEFAULT_MAX_COMPOSITE_COMPONENTS = 16;
+ // used to let netty properly work in case when it directly calls heapBuffer
methods
+ private static final PooledByteBufAllocatorL HEAP_ALLOCATOR =
AllocationManager.INNER_ALLOCATOR;
private final BufferAllocator allocator;
public DrillByteBufAllocator(BufferAllocator allocator) {
@@ -105,37 +111,28 @@ public class DrillByteBufAllocator implements
ByteBufAllocator {
}
@Override
- public boolean isDirectBufferPooled() {
- return false;
- }
-
- @Override
- public ByteBuf heapBuffer() {
- throw fail();
+ protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
+ return HEAP_ALLOCATOR.allocateHeap(initialCapacity, maxCapacity);
}
@Override
- public ByteBuf heapBuffer(int initialCapacity) {
- throw fail();
+ protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
+ return buffer(initialCapacity, maxCapacity);
}
@Override
- public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
- throw fail();
+ public boolean isDirectBufferPooled() {
+ return false;
}
@Override
public CompositeByteBuf compositeHeapBuffer() {
- throw fail();
+ return compositeHeapBuffer(DEFAULT_MAX_COMPOSITE_COMPONENTS);
}
@Override
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
- throw fail();
- }
-
- private RuntimeException fail() {
- throw new UnsupportedOperationException("Allocator doesn't support
heap-based memory.");
+ return new CompositeByteBuf(this, false, maxNumComponents);
}
}
diff --git a/pom.xml b/pom.xml
index 1bd1dac..28dd9d9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,7 +113,7 @@
<surefire.version>3.0.0-M4</surefire.version>
<commons.compress.version>1.20</commons.compress.version>
<hikari.version>3.4.2</hikari.version>
- <netty.version>4.0.48.Final</netty.version>
+ <netty.version>4.1.59.Final</netty.version>
<httpclient.version>4.5.12</httpclient.version>
<libthrift.version>0.13.0</libthrift.version>
<derby.version>10.14.2.0</derby.version>
@@ -1822,6 +1822,10 @@
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>