Khurram Faraaz created DRILL-3771:
-------------------------------------

             Summary: MEMORY LEAK : Concurrent query execution
                 Key: DRILL-3771
                 URL: https://issues.apache.org/jira/browse/DRILL-3771
             Project: Apache Drill
          Issue Type: Bug
          Components: Execution - Flow
    Affects Versions: 1.2.0
         Environment: 4 node cluster CentOS
            Reporter: Khurram Faraaz
            Assignee: Deneche A. Hakim
            Priority: Critical



I am seeing a memory leak when I execute concurrent queries (16 threads). Total 
number of records in the JSON file are close to ~26M. Number of records that 
match the predicate key2 = 'm' are 1,874,177.

I do not see the memory leak reported in the drillbit.log though.

Query STATE is listed as CANCELLATION_REQUESTED for each of the query on the 
Web UI's query profiles page.

master commit ID: b525692e
Query : select key1 , key2 from `twoKeyJsn.json` where key2 = 'm';

I see this on the prompt from where I run the java program

{code}
org.apache.drill.jdbc.AlreadyClosedSqlException: Connection is already closed.
        at 
org.apache.drill.jdbc.impl.DrillConnectionImpl.checkNotClosed(DrillConnectionImpl.java:150)
        at 
org.apache.drill.jdbc.impl.DrillConnectionImpl.createStatement(DrillConnectionImpl.java:331)
        at 
org.apache.drill.jdbc.impl.DrillConnectionImpl.createStatement(DrillConnectionImpl.java:61)
        at 
net.hydromatic.avatica.AvaticaConnection.createStatement(AvaticaConnection.java:91)
        at 
net.hydromatic.avatica.AvaticaConnection.createStatement(AvaticaConnection.java:30)
        at ConcurrencyTest.executeQuery(ConcurrencyTest.java:43)
        at ConcurrencyTest.selectData(ConcurrencyTest.java:33)
        at ConcurrencyTest.run(ConcurrencyTest.java:23)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
java.sql.SQLException: While closing connection
        at net.hydromatic.avatica.Helper.createException(Helper.java:40)
        at 
net.hydromatic.avatica.AvaticaConnection.close(AvaticaConnection.java:137)
        at ConcurrencyTest.executeQuery(ConcurrencyTest.java:52)
        at ConcurrencyTest.selectData(ConcurrencyTest.java:33)
        at ConcurrencyTest.run(ConcurrencyTest.java:23)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.IllegalStateException: Failure while closing accountor.  
Expected private and shared pools to be set to initial values.  However, one or 
more were not.  Stats are
        zone    init    allocated       delta 
        private 0       0       0 
        shared  11246501888     11246497280     4608.
        at 
org.apache.drill.exec.memory.AtomicRemainder.close(AtomicRemainder.java:200)
        at org.apache.drill.exec.memory.Accountor.close(Accountor.java:390)
        at 
org.apache.drill.exec.memory.TopLevelAllocator.close(TopLevelAllocator.java:187)
        at org.apache.drill.exec.client.DrillClient.close(DrillClient.java:261)
        at 
org.apache.drill.jdbc.impl.DrillConnectionImpl.cleanup(DrillConnectionImpl.java:377)
        at 
org.apache.drill.jdbc.impl.DrillHandler.onConnectionClose(DrillHandler.java:36)
        at 
net.hydromatic.avatica.AvaticaConnection.close(AvaticaConnection.java:135)
        ... 8 more

{code}

>From drillbit.log

{code}
2015-09-12 02:32:04,709 [BitServer-4] INFO  o.a.d.e.w.fragment.FragmentExecutor 
- 2a0c71c7-9adc-2222-2a97-f2f218f5b7a2:0:0: State change requested RUNNING --> 
CANCELLATION_REQUESTED
2015-09-12 02:32:04,709 [BitServer-4] INFO  o.a.d.e.w.f.FragmentStatusReporter 
- 2a0c71c7-9adc-2222-2a97-f2f218f5b7a2:0:0: State to report: 
CANCELLATION_REQUESTED
2015-09-12 02:32:04,720 [UserServer-1] ERROR o.a.d.exec.rpc.RpcExceptionHandler 
- Exception in RPC communication.  Connection: /10.10.100.201:31010 <--> 
/10.10.100.201:53620 (user client).  Closing connection.
java.io.IOException: syscall:writev(...)() failed: Broken pipe

...

2015-09-12 02:32:04,896 [UserServer-1] INFO  
o.a.d.e.w.fragment.FragmentExecutor - 2a0c71c8-76f3-fda0-f1c0-fe8b0e80471c:0:0: 
State change requested CANCELLATION_REQUESTED --> FAILED
2015-09-12 02:32:04,898 [UserServer-1] WARN  o.a.d.exec.rpc.RpcExceptionHandler 
- Exception occurred with closed channel.  Connection: /10.10.100.201:31010 
<--> /10.10.100.201:53620 (user client)
io.netty.handler.codec.EncoderException: RpcEncoder must produce at least one 
message.
        at 
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:98)
 ~[netty-codec-4.0.27.Final.jar:4.0.27.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:705)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:980)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1032)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:965)
 [netty-transport-4.0.27.Final.jar:4.0.27.Final]
        at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
 [netty-common-4.0.27.Final.jar:4.0.27.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:254) 
[netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na]
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
 [netty-common-4.0.27.Final.jar:4.0.27.Final]
        at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]

{code}

Output of sys.memory before concurrent queries were executed
{code}
0: jdbc:drill:schema=dfs.tmp> select * from sys.memory
. . . . . . . . . . . . . . > ;
+-------------------+------------+---------------+-------------+-----------------+---------------------+-------------+
|     hostname      | user_port  | heap_current  |  heap_max   | direct_current 
 | jvm_direct_current  | direct_max  |
+-------------------+------------+---------------+-------------+-----------------+---------------------+-------------+
| centos-01.qa.lab  | 31010      | 467280040     | 4294967296  | 12799111       
 | 134231174           | 8589934592  |
| centos-03.qa.lab  | 31010      | 299898648     | 4294967296  | 8750365        
 | 50345094            | 8589934592  |
| centos-04.qa.lab  | 31010      | 241172480     | 4294967296  | 8750365        
 | 50345094            | 8589934592  |
| centos-02.qa.lab  | 31010      | 300100088     | 4294967296  | 8750365        
 | 50345094            | 8589934592  |
+-------------------+------------+---------------+-------------+-----------------+---------------------+-------------+
4 rows selected (1.178 seconds)
{code}

Output of sys.memory after concurrent queries were executed.

{code}
0: jdbc:drill:schema=dfs.tmp> select * from sys.memory;
+-------------------+------------+---------------+-------------+-----------------+---------------------+-------------+
|     hostname      | user_port  | heap_current  |  heap_max   | direct_current 
 | jvm_direct_current  | direct_max  |
+-------------------+------------+---------------+-------------+-----------------+---------------------+-------------+
| centos-01.qa.lab  | 31010      | 2730085456    | 4294967296  | 103535261      
 | 402814086           | 8589934592  |
| centos-03.qa.lab  | 31010      | 329258776     | 4294967296  | 8750365        
 | 100676742           | 8589934592  |
| centos-04.qa.lab  | 31010      | 274726912     | 4294967296  | 8750365        
 | 100676742           | 8589934592  |
| centos-02.qa.lab  | 31010      | 335751672     | 4294967296  | 8750377        
 | 100676742           | 8589934592  |
+-------------------+------------+---------------+-------------+-----------------+---------------------+-------------+
4 rows selected (0.19 seconds)
{code}

To repro the leak execute this class

{code}
import org.apache.log4j.Logger;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.sql.*;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConcurrencyTest implements Runnable {

    Connection conn = null;

    ConcurrencyTest(Connection conn) {
        this.conn = conn;
    }

    public void run() {
        try {
            selectData();
        } catch (Exception e) {
            System.out.println(e.getMessage());
            e.printStackTrace();
        }
    }

    // SELECT data 
    public void selectData() {
        try {
                executeQuery("SELECT key1 , key2 FROM `twoKeyJsn.json` where 
key2 = 'm'");
        } catch(Exception e) {
            System.out.println(e.getMessage());
            e.printStackTrace();
        }
    }

    // Execute Query
    public void executeQuery(String query) {
        try {
                Statement stmt = conn.createStatement();
                ResultSet rs = stmt.executeQuery(query);

                while(rs.next()) {
                // do nothing.
                }
            if (rs != null)
                rs.close();
            stmt.close();
            conn.close();
        } catch (Exception e) {
            System.out.println(e.getMessage());
            e.printStackTrace();

        }
    }

    public static void main(String s[]) throws Exception {

        final String URL_STRING = 
"jdbc:drill:schema=dfs.tmp;drillbit=10.10.100.201";
        Class.forName("org.apache.drill.jdbc.Driver").newInstance();
        Connection conn = DriverManager.getConnection(URL_STRING,"","");

        ExecutorService executor = Executors.newFixedThreadPool(16);
        try {
            for (int i = 1; i <= 100; i++) {
                executor.submit(new ConcurrencyTest(conn));
            }
        } catch (Exception e) {
            System.out.println(e.getMessage());
            e.printStackTrace();
        }
    }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to