[
https://issues.apache.org/jira/browse/DRILL-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237777#comment-15237777
]
ASF GitHub Bot commented on DRILL-3714:
---------------------------------------
Github user jacques-n commented on a diff in the pull request:
https://github.com/apache/drill/pull/463#discussion_r59434232
--- Diff:
exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java ---
@@ -20,51 +20,82 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import com.carrotsearch.hppc.IntObjectHashMap;
+import com.carrotsearch.hppc.procedures.IntObjectProcedure;
+import com.google.common.base.Preconditions;
+
/**
- * Manages the creation of rpc futures for a particular socket.
+ * Manages the creation of rpc futures for a particular socket <--> socket
+ * connection. Generally speaking, there will be two threads working with
this
+ * class (the socket thread and the Request generating thread).
Synchronization
+ * is simple with the map being the only thing that is protected.
Everything
+ * else works via Atomic variables.
*/
-public class CoordinationQueue {
- static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(CoordinationQueue.class);
+class RequestIdMap {
+ static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(RequestIdMap.class);
+
+ private final AtomicInteger value = new AtomicInteger();
+ private final AtomicBoolean acceptMessage = new AtomicBoolean(true);
- private final PositiveAtomicInteger circularInt = new
PositiveAtomicInteger();
- private final Map<Integer, RpcOutcome<?>> map;
+ /** Access to map must be protected. **/
+ private final IntObjectHashMap<RpcOutcome<?>> map;
- public CoordinationQueue(int segmentSize, int segmentCount) {
- map = new ConcurrentHashMap<Integer, RpcOutcome<?>>(segmentSize,
0.75f, segmentCount);
+ public RequestIdMap() {
+ map = new IntObjectHashMap<RpcOutcome<?>>();
}
void channelClosed(Throwable ex) {
+ acceptMessage.set(false);
if (ex != null) {
- RpcException e;
- if (ex instanceof RpcException) {
- e = (RpcException) ex;
- } else {
- e = new RpcException(ex);
+ final RpcException e = RpcException.mapException(ex);
+ synchronized (map) {
+ map.forEach(new Closer(e));
+ map.clear();
}
- for (RpcOutcome<?> f : map.values()) {
- f.setException(e);
+ }
+ }
+
+ private class Closer implements IntObjectProcedure<RpcOutcome<?>> {
+ final RpcException exception;
+
+ public Closer(RpcException exception) {
+ this.exception = exception;
+ }
+
+ @Override
+ public void apply(int key, RpcOutcome<?> value) {
+ try{
+ value.setException(exception);
+ }catch(Exception e){
+ logger.warn("Failure while attempting to fail rpc response.", e);
}
}
+
}
- public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V>
handler, Class<V> clazz, RemoteConnection connection) {
- int i = circularInt.getNext();
+ public <V> ChannelListenerWithCoordinationId
createNewRpcListener(RpcOutcomeListener<V> handler, Class<V> clazz,
+ RemoteConnection connection) {
+ int i = value.incrementAndGet();
RpcListener<V> future = new RpcListener<V>(handler, clazz, i,
connection);
- Object old = map.put(i, future);
- if (old != null) {
- throw new IllegalStateException(
- "You attempted to reuse a coordination id when the previous
coordination id has not been removed. This is likely rpc future callback
memory leak.");
+ final Object old;
+ synchronized (map) {
+ Preconditions.checkArgument(acceptMessage.get(),
--- End diff --
I prefer to spend as little time in the synchronized block as possible. If
we move this up we still need to check in the synchronized block (e.g. we could
make this double-checked).
> Query runs out of memory and remains in CANCELLATION_REQUESTED state until
> drillbit is restarted
> ------------------------------------------------------------------------------------------------
>
> Key: DRILL-3714
> URL: https://issues.apache.org/jira/browse/DRILL-3714
> Project: Apache Drill
> Issue Type: Bug
> Components: Execution - Flow
> Affects Versions: 1.2.0
> Reporter: Victoria Markman
> Assignee: Jacques Nadeau
> Priority: Critical
> Fix For: 1.7.0
>
> Attachments: Screen Shot 2015-08-26 at 10.36.33 AM.png, drillbit.log,
> jstack.txt, query_profile_2a2210a7-7a78-c774-d54c-c863d0b77bb0.json
>
>
> This is a variation of DRILL-3705 with the difference of drill behavior when
> hitting OOM condition.
> Query runs out of memory during execution and remains in
> "CANCELLATION_REQUESTED" state until drillbit is bounced.
> Client (sqlline in this case) never gets a response from the server.
> Reproduction details:
> Single node drillbit installation.
> DRILL_MAX_DIRECT_MEMORY="8G"
> DRILL_HEAP="4G"
> Run this query on TPCDS SF100 data set
> {code}
> SELECT SUM(ss.ss_net_paid_inc_tax) OVER (PARTITION BY ss.ss_store_sk) AS
> TotalSpend FROM store_sales ss WHERE ss.ss_store_sk IS NOT NULL ORDER BY 1
> LIMIT 10;
> {code}
> drillbit.log
> {code}
> 2015-08-26 16:54:58,469 [2a2210a7-7a78-c774-d54c-c863d0b77bb0:frag:3:22] INFO
> o.a.d.e.w.f.FragmentStatusReporter -
> 2a2210a7-7a78-c774-d54c-c863d0b77bb0:3:22: State to report: RUNNING
> 2015-08-26 16:55:50,498 [BitServer-5] WARN
> o.a.drill.exec.rpc.data.DataServer - Message of mode REQUEST of rpc type 3
> took longer than 500ms. Actual duration was 2569ms.
> 2015-08-26 16:56:31,086 [BitServer-5] ERROR
> o.a.d.exec.rpc.RpcExceptionHandler - Exception in RPC communication.
> Connection: /10.10.88.133:31012 <--> /10.10.88.133:54554 (data server).
> Closing connection.
> io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: Direct
> buffer memory
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:233)
> ~[netty-codec-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> [netty-transport-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> [netty-transport-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
> [netty-transport-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> [netty-transport-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> [netty-transport-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
> [netty-transport-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618)
> [netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na]
> at
> io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:329)
> [netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na]
> at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250)
> [netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na]
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> [netty-common-4.0.27.Final.jar:4.0.27.Final]
> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:658) ~[na:1.7.0_71]
> at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> ~[na:1.7.0_71]
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
> ~[na:1.7.0_71]
> at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:437)
> ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
> at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:179)
> ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:168)
> ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
> at io.netty.buffer.PoolArena.reallocate(PoolArena.java:280)
> ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
> at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:110)
> ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
> ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
> ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
> ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
> ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
> at io.netty.buffer.WrappedByteBuf.writeBytes(WrappedByteBuf.java:600)
> ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.buffer.UnsafeDirectLittleEndian.writeBytes(UnsafeDirectLittleEndian.java:28)
> ~[drill-java-exec-1.2.0-SNAPSHOT.jar:4.0.27.Final]
> at
> io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
> ~[netty-codec-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:227)
> ~[netty-codec-4.0.27.Final.jar:4.0.27.Final]
> ... 11 common frames omitted
> 2015-08-26 16:56:31,087 [BitServer-5] INFO
> o.a.d.exec.rpc.ProtobufLengthDecoder - Channel is closed, discarding
> remaining 124958 byte(s) in buffer.
> 2015-08-26 16:56:31,087 [BitClient-1] ERROR
> o.a.d.exec.rpc.RpcExceptionHandler - Exception in RPC communication.
> Connection: /10.10.88.133:54554 <--> /10.10.88.133:31012 (data client).
> Closing connection.
> java.io.IOException: syscall:read(...)() failed: Connection reset by peer
> 2015-08-26 16:56:31,088 [BitClient-1] INFO
> o.a.drill.exec.rpc.data.DataClient - Channel closed /10.10.88.133:54554 <-->
> /10.10.88.133:31012.
> 2015-08-26 16:56:35,325 [BitServer-6] ERROR
> o.a.d.exec.rpc.RpcExceptionHandler - Exception in RPC communication.
> Connection: /10.10.88.133:31012 <--> /10.10.88.133:54555 (data server).
> Closing connection.
> io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: Direct
> buffer memory
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:233)
> ~[netty-codec-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> [netty-transport-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> [netty-transport-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
> [netty-transport-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> [netty-transport-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> [netty-transport-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
> [netty-transport-4.0.27.Final.jar:4.0.27.Final]
> at
> io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618)
> [netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na]
> at
> io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:329)
> [netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na]
> at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250)
> [netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na]
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> [netty-common-4.0.27.Final.jar:4.0.27.Final]
> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> {code}
> Attached:
> query_profile_2a2210a7-7a78-c774-d54c-c863d0b77bb0.json
> drillbit.log
> jstack.txt
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)