Hi all,

It tooks a while for me to convince ourselves that this wasn't an
application problem. I am attaching a test case that reliably
reproduces the dead socket problem on some systems. The flow is
essentially the same as the networking code in our messaging
system.

I had the best luck reproducing this on Dell Poweredge 2970s (two
socket AMD) running CentOS 5.3. I dual booted two of them with
Ubuntu server 9.04 and have not succeded in reproducing the
problem with Ubuntu. I was not able to reproduce the problem on
the Dell R610 (2 socket Nehalem) machines running CentOS 5.3 with
the test application although the actual app (messaging system)
does have this issue on the 610s.

I am very interested in hearing about what happens when other
people run it. I am also interested in confirming that this is a
sane use of Selectors, SocketChannels, and SelectionKeys.

Thanks,
Ariel Weisberg

On Wed, 15 Jul 2009 14:24 -0700, "Martin Buchholz"
<[email protected]> wrote:

  In summary,
  there are two different bugs at work here,
  and neither of them is in LBD.
  The hotspot team is working on the LBD deadlock.
  (As always) It would be good to have a good test case for
  the dead socket problem.
  Martin

On Wed, Jul 15, 2009 at 12:24, Ariel Weisberg
<[1][email protected]> wrote:

Hi,

I have found that there are two different failure modes without
involving -XX:+UseMembar. There is the LBD deadlock and then
there is the dead socket in between two nodes. Either failure can
occur with the same code and settings. It appears that the dead
socket problem is more common. The LBD failure is also not
correlated with any specific LBD (originally saw it with only the
LBD for an Initiator's mailbox).

With -XX:+UseMembar the system is noticeably more reliable and
tends to run much longer without failing (although it can still
fail immediately). When it does fail it has been due to a dead
connection. I have not reproduced a deadlock on an LBD with
-XX:+UseMembar.

I also found that the dead socket issue was reproducible twice on
Dell Poweredge 2970s (two socket AMD). It takes an hour or so to
reproduce the dead socket problem on the 2970. I have not
recreated the LBD issue on them although given how difficult the
socket issue is to reproduce it may be that I have not run them
long enough. On the AMD machines I did not use -XX:+UseMembar.

Ariel


On Mon, 13 Jul 2009 18:59 -0400, "Ariel Weisberg"
<[2][email protected]> wrote:

Hi all.

Sorry Martin I missed reading your last email. I am not confident
that I will get a small reproducible test case in a reasonable
time frame. Reproducing it with the application is easy and I
will see what I can do about getting the source available.

One interesting thing I can tell you is that if I remove the
LinkedBlockingDeque from the mailbox of the Initiator the system
still deadlocks. The cluster has a TCP mesh topology so any node
can deliver messages to any other node. One of the connections
goes dead and neither side detects that there is a problem. I add
some assertions to the network selection thread to check that all
the connections in the cluster are still healthy and assert that
they have the correct interests set.

Here are the things it checks for  to make sure each connection
is working:
>                             for (ForeignHost.Port port :
foreignHostPorts) {
>
assert(port.m_selectionKey.isValid());
>
assert(port.m_selectionKey.selector() == m_selector);
>                             assert(port.m_channel.isOpen());
>
assert(((SocketChannel)port.m_channel).isConnected());
>
assert(((SocketChannel)port.m_channel).socket().isInputShutdown()
== false);
>
assert(((SocketChannel)port.m_channel).socket().isOutputShutdown(
) == false);
>
assert(((SocketChannel)port.m_channel).isOpen());
>
assert(((SocketChannel)port.m_channel).isRegistered());
>
assert(((SocketChannel)port.m_channel).keyFor(m_selector) !=
null);
>
assert(((SocketChannel)port.m_channel).keyFor(m_selector) ==
port.m_selectionKey);
>                             if
(m_selector.selectedKeys().contains(port.m_selectionKey)) {
>
assert((port.m_selectionKey.interestOps() & SelectionKey.OP_READ)
!= 0);
>
assert((port.m_selectionKey.interestOps() &
SelectionKey.OP_WRITE) != 0);
>                             } else {
>                                 if (port.isRunning()) {
>
assert(port.m_selectionKey.interestOps() == 0);
>                                 } else {
>
port.m_selectionKey.interestOps(SelectionKey.OP_READ |
SelectionKey.OP_WRITE);
>                                     assert((port.interestOps()
& SelectionKey.OP_READ) != 0);
>                                     assert((port.interestOps()
& SelectionKey.OP_WRITE) != 0);
>                                 }
>                             }
>                             assert(m_selector.isOpen());
>
assert(m_selector.keys().contains(port.m_selectionKey));
OP_READ | OP_WRITE is set as the interest ops every time through,
and there is no other code that changes the interest ops during
execution. The application will run for a while and then one of
the connections will stop being selected on both sides. If I step
in with the debugger on either side everything looks correct. The
keys have the correct interest ops and the selectors have the
keys in their key set.

What I suspect is happening is that a bug on one node stops the
socket from being selected (for both read and write), and
eventually the socket fills up and can't be written to by the
other side.

If I can get my VPN access together tomorrow I will run with
-XX:+UseMembar and also try running on some 8-core AMD machines.
Otherwise I will have to get to it Wednesday.

Thanks,

Ariel Weisberg


On Tue, 14 Jul 2009 05:00 +1000, "David Holmes"
<[3][email protected]> wrote:

Martin,



I don't think this is due to LBQ/D. This is looking similar to a
couple of other ReentrantLock/AQS "lost wakeup" hangs that I've
got on the radar. We have a reprodeucible test case for one issue
but it only fails on one kind of system - x4450. I'm on vacation
most of this week but will try and get back to this next week.



Ariel: one thing to try please see if -XX:+UseMembar fixes the
problem.



Thanks,

David Holmes

-----Original Message-----
From: Martin Buchholz [mailto:[4][email protected]]
Sent: Tuesday, 14 July 2009 8:38 AM
To: Ariel Weisberg
Cc: [5][email protected]; core-libs-dev;
[6][email protected]
Subject: Re: [concurrency-interest] LinkedBlockingDeque deadlock?

  I did some stack trace eyeballing and did a mini-audit of the
  LinkedBlockingDeque code, with a view to finding possible
  bugs,
  and came up empty.  Maybe it's a deep bug in hotspot?
  Ariel, it would be good if you could get a reproducible test
  case soonish,
  while someone on the planet has the motivation and familiarity
  to fix it.
  In another month I may disavow all knowledge of
  j.u.c.*Blocking*
  Martin

On Wed, Jul 8, 2009 at 15:57, Ariel Weisberg
<[7][email protected]> wrote:

  Hi,

> The poll()ing thread is blocked waiting for the internal lock,
but
> there's
> no indication of any thread owning that lock. You're using an
OpenJDK 6
> build ... can you try JDK7 ?


  I got a chance to do that today. I downloaded JDK 7 from
  [8]http://www.java.net/download/jdk7/binaries/jdk-7-ea-bin-b63
  -linux-x64-02_jul_2009.bin
  and was able to reproduce the problem. I have attached the
  stack trace
  from running the 1.7 version. It is the same situation as
  before except
  there are 9 execution sites running on each host. There are no
  threads
  that are missing or that have been restarted. Foo Network
  thread
  (selector thread) and Network Thread - 0 are waiting on
  0x00002aaab43d3b28. I also ran with JDK 7 and 6 and
  LinkedBlockingQueue
  and was not able to recreate the problem using that structure.

> I don't recall anything similar to this, but I don't know what
version
> that
> OpenJDK6 build relates to.


  The cluster is running on CentOS 5.3.
  >[aweisb...@3f ~]$ rpm -qi
  java-1.6.0-openjdk-1.6.0.0-0.30.b09.el5
  >Name        : java-1.6.0-openjdk           Relocations: (not
  relocatable)
  >Version     : 1.6.0.0                           Vendor:
  CentOS
  >Release     : 0.30.b09.el5                  Build Date: Tue
  07 Apr 2009 07:24:52 PM EDT
  >Install Date: Thu 11 Jun 2009 03:27:46 PM EDT      Build
  Host: [9]builder10.centos.org
  >Group       : Development/Languages         Source RPM:
  java-1.6.0-openjdk-1.6.0.0-0.30.b09.el5.src.rpm
  >Size        : 76336266                         License: GPLv2
  with exceptions
  >Signature   : DSA/SHA1, Wed 08 Apr 2009 07:55:13 AM EDT, Key
  ID a8a447dce8562897
  >URL         : [10]http://icedtea.classpath.org/
  >Summary     : OpenJDK Runtime Environment
  >Description :
  >The OpenJDK runtime environment.

> Make sure you haven't missed any exceptions occurring in other
threads.

  There are no threads missing in the application (terminated
  threads are
  not replaced) and there is a try catch pair (prints error and
  rethrows)
  around the run loop of each thread. It is possible that an
  exception may
  have been swallowed up somewhere.

>A small reproducible test case from you would be useful.

  I am working on that. I wrote a test case that mimics the
  application's
  use of the LBD, but I have not succeeded in reproducing the
  problem in
  the test case. The app has a single thread (network selector)
  that polls
  the LBD and several threads (ExecutionSites, and network
  threads that
  return results from remote ExecutionSites) that offer results
  into the
  queue. About 120k items will go into/out of the deque each
  second. In
  the actual app the problem is reproducible but inconsistent.
  If I run on
  my dual core laptop I can't reproduce it, and it is less
  likely to occur
  with a small cluster, but with 6 nodes (~560k
  transactions/sec) the
  problem will usually appear. Sometimes the cluster will run
  for several
  minutes without issue and other times it will deadlock
  immediately.
  Thanks,
  Ariel

On Wed, 08 Jul 2009 05:14 +1000, "Martin Buchholz"
<[11][email protected]> wrote:
>[+core-libs-dev]
>
>Doug Lea and I are (slowly) working on a new version of
LinkedBlockingDeque.
>I was not aware of a deadlock but can vaguely imagine how it
might happen.
>A small reproducible test case from you would be useful.
>
>Unfinished work in progress can be found here:
>[12]http://cr.openjdk.java.net/~martin/webrevs/openjdk7/Blocking
Queue/
>
>Martin


  On Wed, 08 Jul 2009 05:14 +1000, "David Holmes"

<[13][email protected]> wrote:
>


> Ariel,
>
> The poll()ing thread is blocked waiting for the internal lock,
but
> there's
> no indication of any thread owning that lock. You're using an
OpenJDK 6
> build ... can you try JDK7 ?
>
> I don't recall anything similar to this, but I don't know what
version
> that
> OpenJDK6 build relates to.
>
> Make sure you haven't missed any exceptions occurring in other
threads.
>
> David Holmes
>
> > -----Original Message-----
> > From: [14][email protected]
> > [mailto:[15][email protected]]on
Behalf Of Ariel
> > Weisberg
> > Sent: Wednesday, 8 July 2009 8:31 AM
> > To: [16][email protected]
> > Subject: [concurrency-interest] LinkedBlockingDeque deadlock?
> >
> >
> > Hi all,
> >
> > I did a search on LinkedBlockingDeque and didn't find
anything similar
> > to what I am seeing. Attached is the stack trace from an
application
> > that is deadlocked with three threads waiting for
0x00002aaab3e91080
> > (threads "ExecutionSite: 26", "ExecutionSite:27", and
"Network
> > Selector"). The execution sites are attempting to offer
results to the
> > deque and the network thread is trying to poll for them using
the
> > non-blocking version of poll. I am seeing the network thread
never
> > return from poll (straight poll()). Do my eyes deceive me?
> >
> > Thanks,
> >
> > Ariel Weisberg
> >
>

  _______________________________________________
  Concurrency-interest mailing list
  [17][email protected]
  [18]http://cs.oswego.edu/mailman/listinfo/concurrency-interest

References

1. mailto:[email protected]
2. mailto:[email protected]
3. mailto:[email protected]
4. mailto:[email protected]
5. mailto:[email protected]
6. mailto:[email protected]
7. mailto:[email protected]
8. 
http://www.java.net/download/jdk7/binaries/jdk-7-ea-bin-b63-linux-x64-02_jul_2009.bin
9. http://builder10.centos.org/
  10. http://icedtea.classpath.org/
  11. mailto:[email protected]
  12. http://cr.openjdk.java.net/%7Emartin/webrevs/openjdk7/BlockingQueue/
  13. mailto:[email protected]
  14. mailto:[email protected]
  15. mailto:[email protected]
  16. mailto:[email protected]
  17. mailto:[email protected]
  18. http://cs.oswego.edu/mailman/listinfo/concurrency-interest
import java.io.IOException;
import java.nio.*;
import java.nio.channels.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.HashSet;
import java.util.ArrayList;

/**
 * A class implementing a throughput test across several TCP sockets. This class implements both the client and the server
 *
 */
public class TCPThroughput {
    
    /**
     * A port is a wrapper around a socket channel. The default behavior of this port when handling work is
     * to read a large quantity of data if it is available and write a large quantity of data if the channel has capacity.
     *
     */
    private static class Port {
        
        /**
         * Dummy ByteBuffer with no data
         */
        private ByteBuffer m_buffer = ByteBuffer.allocateDirect(expectedPacketSize);
        
        /**
         * Set to true if a socket channel operation throws an IOException. Causes the socket channel to be closed 
         * by the selector thread when this port is reached in the change list.
         */
        public volatile boolean isDead = false;
        
        public final SocketChannel m_channel;
        public final int m_port;
        final SelectionKey m_selectionKey;
        
        public Port(SocketChannel channel, SelectionKey key, int port) {
            m_channel = channel;
            m_port = port;
            m_selectionKey = key;
        }
        
        protected int m_readyOps = 0;
        
        public void handleWork() {
            try {
                m_buffer.clear();
                if (m_selectionKey.isReadable()) {
                    bytesReceived.addAndGet(m_channel.read(m_buffer));
                }
                m_buffer.clear();
                if (m_selectionKey.isWritable()) {
                    bytesSent.addAndGet(m_channel.write(m_buffer));
                }
            } catch (IOException e) {
                e.printStackTrace();
                isDead = true;
            } finally {
                addToChangeList(this);
            }
        }
    }
    
    /*
     * Selected ports is used as the point of synchronization between the selection
     * thread and the watchdog thread. It also protects m_ports which is shared between
     * the two threads.
     */
    private static final HashSet<Port> selectedPorts = new HashSet<Port>();
    private static final ArrayList<Port> m_ports = new ArrayList<Port>();
    
    /*
     * Various fields containing statistical information and keeping track of the last time a message was received.
     */
    public static boolean messageReceived = false;
    public static long firstMessageReceived = 0;
    public static long lastMessageReceived = 0;
    public static final AtomicLong bytesReceived = new AtomicLong(0);
    public static final AtomicLong bytesSent = new AtomicLong(0);
    
    /*
     * Structures for the change list
     */
    private static final ArrayList<Port> m_selectorUpdates_1 = new ArrayList<Port>();
    private static final ArrayList<Port> m_selectorUpdates_2 = new ArrayList<Port>();
    private static ArrayList<Port> m_activeUpdateList = new ArrayList<Port>();
    
    /*
     * Lock used to protect the selector change lists that are shared between the ports
     * and the selector thread.
     */
    private static final Object m_lock = new Object();
    
    private static final int port = 29600;
    private static final int numPorts = 30;
    static int expectedPacketSize = 600;

    private static ServerSocketChannel servers[];
    private static Selector selector;
    private static ExecutorService executor = Executors.newFixedThreadPool(6);
    
    private static volatile boolean selectorWoke = false;
    private static volatile boolean shouldContinue = true;
    
    private static int seconds = 60;
    private static String addressString = "localhost";
    private static InetAddress address;
    
    /**
     * Called by the selection thread to process any ports added to the changelist. Closes ports that have died due to an IOException
     * and sets the interest ops for ports that have finished handling work.
     */
    protected static void installInterests() {
        
        // swap the update lists to avoid contention while
        // draining the requested values. also guarantees
        // that the end of the list will be reached if code
        // appends to the update list without bound.
        ArrayList<Port> oldlist;
        synchronized(m_lock) {
            if (m_activeUpdateList == m_selectorUpdates_1) {
                oldlist = m_selectorUpdates_1;
                m_activeUpdateList = m_selectorUpdates_2;
            }
            else {
                oldlist = m_selectorUpdates_2;
                m_activeUpdateList = m_selectorUpdates_1;
            }
        }
        
        for (Port i : oldlist) {
            try {
                if (i.isDead) {
                    System.out.println("Closing port " + i.m_port);
                    synchronized (selectedPorts) {
                        m_ports.remove(i);
                    }
                    i.m_selectionKey.cancel();
                    try {
                        i.m_channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                } else {
                    if (i.m_selectionKey.isValid()) i.m_selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                }
            } catch(CancelledKeyException ex) {
                // continue if a key is canceled.
            }
        }
        oldlist.clear();
    }
    
    /**
     * Method for a port to add itself to the change list so the selection thread can updates its interest ops.
     * @param port
     */
    private static void addToChangeList(Port port) {
        synchronized (m_lock) {
            m_activeUpdateList.add(port);
        }
        selector.wakeup();
    }

    /*
     * A server opens up some server sockets and accepts connections on them. It terminates when no messages have been received for 5 seconds.
     * A client opens up some connections and sends messages for a fixed period of time.
     */
    public static void main(String[] args) {
        boolean runServer = false;
        for (String arg : args) {
            String[] parts = arg.split("=",2);
            if (parts.length == 1) {
                if (parts[0].equals("server")) {
                    runServer = true;
                }
                continue;
            } else if (parts[1].startsWith("${")) {
                continue;
            } else if (parts[0].equals("address")) {
                addressString = parts[1];
            } else if (parts[0].equals("seconds")) {
                seconds = Integer.parseInt(parts[1]);
            }
        }
        
        try {
            address = InetAddress.getByName(addressString);
        } catch (UnknownHostException e) {
            e.printStackTrace();
            System.exit(-1);
        }
        
        try {
            selector = Selector.open();
        } catch (IOException e1) {
            e1.printStackTrace();
            System.exit(-1);
        }
        
        if (runServer) {
            servers = new ServerSocketChannel[numPorts];
            for (int ii = 0; ii < numPorts; ii++) {
                try {
                    servers[ii] = ServerSocketChannel.open();
                    servers[ii].configureBlocking(false);
                    servers[ii].socket().bind(new InetSocketAddress(port + ii));
                    SelectionKey serverKey = servers[ii].register(selector,
                            SelectionKey.OP_ACCEPT);
                    serverKey.attach(servers[ii]);
                } catch (IOException e) {
                    e.printStackTrace();
                    System.exit(-1);
                }
            }
        } else {
            for (int ii = 0; ii < numPorts; ii++) {
                Socket tempsocket = null;
                SocketChannel tempsocketchannel = null;
                try {
                    tempsocketchannel = SocketChannel.open();
                    tempsocket = tempsocketchannel.socket();
                    tempsocket.setTcpNoDelay(false);
                    tempsocket.setReceiveBufferSize(16777216);
                    tempsocket.setSendBufferSize(16777216);
                    tempsocketchannel.configureBlocking(true);
                    tempsocketchannel.connect(new InetSocketAddress( address, port + ii));
                    tempsocketchannel.configureBlocking(false);
                    SelectionKey key = tempsocketchannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                    Port p = new Port(tempsocketchannel, key, ii);
                    key.attach(p);
                    m_ports.add(p);
                } catch (UnknownHostException e) {
                    e.printStackTrace();
                    System.exit(-1);
                } catch (IOException e) {
                    e.printStackTrace();
                    System.exit(-1);
                }
            }
        }
        
        if (!runServer) {
            /*
             * Timer thread that terminates the Selector loop 
             */
            new Thread() {
                public void run() {
                    try {
                        Thread.sleep(seconds * 1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    shouldContinue = false;
                }
            }.start();
        }
        
        /*
         * Watchdog thread that checks that each port is selected at least once every 60 seconds. Also checks to make sure
         * that at least one port has been selected every 60 seconds. 
         */
        new Thread() {
            public void run() {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                while (true) {
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    synchronized (selectedPorts) {
                        for (Port port : m_ports) {
                            if (!selectedPorts.contains(port)) {
                                System.out.println("A port(" + port.m_port + ") interestOps(" + port.m_selectionKey.interestOps() + ")has not been selected for read/write for 60 seconds!");
                            }
                        }
                        if (!selectorWoke) {
                            System.out.println("Selector hasn't returned for 60 seconds!");
                        }
                        selectorWoke = false;
                        selectedPorts.clear();
                    }
                }
            }
        }.start();

        while (shouldContinue) {
            /*
             * Check if no messages have been received for 5 seconds. This is the indication that no more messages are
             * forthcoming and that it is time to print stats and terminate.
             */
            if (messageReceived == true && runServer) {
                long now = System.currentTimeMillis();
                if (now - lastMessageReceived > 5000) {
                    long delta = (lastMessageReceived - firstMessageReceived) / 1000;
                    if (delta == 0) { delta = 1; }
                    System.out
                            .println("TCPThroughputReceiver result:\n\tExpected packet size == " + expectedPacketSize + "\n\tmessagesReceived == "
                                    + bytesReceived.get()
                                    / expectedPacketSize
                                    + "\n\tbytesReceived == "
                                    + bytesReceived.get() +
                                    "\n\tbytesSent == " + bytesSent.get()
                                    + "\nmegabytes/sec == " + ((bytesReceived.get() + bytesSent.get()) / delta / 1024 / 1024)
                                    + "\nmegabits/sec == " + ((bytesReceived.get() + bytesSent.get()) / delta / 1024 / 1024 * 8)
                                    + "\n\nReceived first message "
                                    + firstMessageReceived
                                    + " and last message "
                                    + lastMessageReceived
                                    + " delta is "
                                    + delta);
                    System.exit(0);
                }
            }
            
            try {
                selector.selectNow();
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(-1);
            }
            selectorWoke = true;
            installInterests();
            
            for (Iterator<SelectionKey> it = selector.selectedKeys().iterator(); it
                    .hasNext();) {
                SelectionKey key = it.next();
                it.remove();

                if (key.isAcceptable()) {
                    try {
                        ServerSocketChannel server = (ServerSocketChannel)key.attachment();
                        int port = 0;
                        for (int ii = 0; ii < servers.length; ii++) {
                            if (server == servers[ii]) {
                                port = ii;
                            }
                        }
                        SocketChannel client = server.accept();
                        Socket tempsocket = client.socket();
                        tempsocket.setReceiveBufferSize(16777216);
                        tempsocket.setSendBufferSize(16777216);
                        assert client != null;
                        tempsocket.setTcpNoDelay(false);
                        
                        client.configureBlocking(false);
                        SelectionKey clientKey = client.register(selector,
                                SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                        Port newPort = new Port(client, clientKey, port);
                        synchronized (selectedPorts) {
                            m_ports.add(newPort);
                        }
                        clientKey.attach(newPort);
                    } catch (IOException e) {
                        e.printStackTrace();
                        System.exit(-1);
                    }
                } else {
                    assert key.isReadable() || key.isWritable();
                    if (!messageReceived && key.isReadable()) {
                        messageReceived = true;
                        System.out.println("First message received");
                        firstMessageReceived = System.currentTimeMillis();
                    }
                    if (key.isReadable()) {
                        lastMessageReceived = System.currentTimeMillis();
                    }
                    final Port port = (Port) key.attachment();
                    synchronized (selectedPorts) {
                        selectedPorts.add(port);
                    }
                    key.interestOps(0);
                    executor.execute(new Runnable() {
                        public void run() {
                            port.handleWork();
                        }
                    });
                }
            }
        }
    }
}

Reply via email to