[
https://issues.apache.org/jira/browse/IGNITE-3412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15399009#comment-15399009
]
Yakov Zhdanov commented on IGNITE-3412:
---------------------------------------
Guys this looks really weird.
Can you please try this class with debug output?
{noformat}
/**
*
*/
private class SocketWriter extends IgniteSpiThread {
/** */
private final Object mux = new Object();
/** */
private Socket sock;
/** */
private boolean clientAck;
/** */
private final Queue<TcpDiscoveryAbstractMessage> queue = new
ArrayDeque<>();
/** */
private final long socketTimeout;
/** */
private TcpDiscoveryAbstractMessage unackedMsg;
/**
*
*/
protected SocketWriter() {
super(spi.ignite().name(), "tcp-client-disco-sock-writer", log);
socketTimeout = spi.failureDetectionTimeoutEnabled() ?
spi.failureDetectionTimeout() :
spi.getSocketTimeout();
}
/**
* @param msg Message.
*/
private void sendMessage(TcpDiscoveryAbstractMessage msg) {
synchronized (mux) {
queue.add(msg);
mux.notifyAll();
}
}
/**
* @param sock Socket.
* @param clientAck {@code True} is server supports client message
acknowlede.
*/
private void setSocket(Socket sock, boolean clientAck) {
synchronized (mux) {
this.sock = sock;
this.clientAck = clientAck;
unackedMsg = null;
mux.notifyAll();
}
}
/**
* @return {@code True} if connection is alive.
*/
public boolean isOnline() {
synchronized (mux) {
return sock != null;
}
}
/**
* @param res Acknowledge response.
*/
void ackReceived(TcpDiscoveryClientAckResponse res) {
synchronized (mux) {
if (unackedMsg != null) {
assert unackedMsg.id().equals(res.messageId()) : unackedMsg;
unackedMsg = null;
}
mux.notifyAll();
}
}
/** {@inheritDoc} */
@Override public void interrupt() {
super.interrupt();
U.debug(log, "SocketWriter has been interrupted: " +
System.identityHashCode(this));
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
TcpDiscoveryAbstractMessage msg = null;
while (!Thread.currentThread().isInterrupted()) {
Socket sock;
U.debug(log, "1 SocketWriter thread interrupted status " +
"[status=" + Thread.currentThread().isInterrupted() +
", hash=" + System.identityHashCode(this) + ']');
synchronized (mux) {
sock = this.sock;
if (sock == null) {
mux.wait();
continue;
}
if (msg == null)
msg = queue.poll();
if (msg == null) {
mux.wait();
continue;
}
}
U.debug(log, "2 SocketWriter thread interrupted status " +
"[status=" + Thread.currentThread().isInterrupted() +
", hash=" + System.identityHashCode(this) + ']');
for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr :
spi.sndMsgLsnrs)
msgLsnr.apply(msg);
boolean ack = clientAck && !(msg instanceof
TcpDiscoveryPingResponse);
try {
if (ack) {
synchronized (mux) {
assert unackedMsg == null : "Unacked=" + unackedMsg
+ ", received=" + msg;
unackedMsg = msg;
}
}
U.debug(log, "3 SocketWriter thread interrupted status " +
"[status=" + Thread.currentThread().isInterrupted() +
", hash=" + System.identityHashCode(this) + ']');
spi.writeToSocket(
sock,
msg,
socketTimeout);
msg = null;
U.debug(log, "4 SocketWriter thread interrupted status " +
"[status=" + Thread.currentThread().isInterrupted() +
", hash=" + System.identityHashCode(this) + ']');
if (ack) {
long waitEnd = U.currentTimeMillis() +
(spi.failureDetectionTimeoutEnabled() ?
spi.failureDetectionTimeout() :
spi.getAckTimeout());
TcpDiscoveryAbstractMessage unacked;
synchronized (mux) {
while (unackedMsg != null && U.currentTimeMillis()
< waitEnd)
mux.wait(waitEnd);
unacked = unackedMsg;
unackedMsg = null;
}
U.debug(log, "5 SocketWriter thread interrupted status
" +
"[status=" + Thread.currentThread().isInterrupted()
+
", hash=" + System.identityHashCode(this) + ']');
if (unacked != null) {
if (log.isDebugEnabled())
log.debug("Failed to get acknowledge for
message, will try to reconnect " +
"[msg=" + unacked +
(spi.failureDetectionTimeoutEnabled() ?
", failureDetectionTimeout=" +
spi.failureDetectionTimeout() :
", timeout=" + spi.getAckTimeout()) + ']');
throw new IOException("Failed to get acknowledge
for message: " + unacked);
}
}
}
catch (IOException e) {
log.error("DEBUG error thrown.", e); // TODO
if (log.isDebugEnabled())
U.error(log, "Failed to send node left message (will
stop anyway) " +
"[sock=" + sock + ", msg=" + msg + ']', e);
U.debug(log, "6 SocketWriter thread interrupted status " +
"[status=" + Thread.currentThread().isInterrupted() +
", hash=" + System.identityHashCode(this) + ']');
U.closeQuiet(sock);
U.debug(log, "7 SocketWriter thread interrupted status " +
"[status=" + Thread.currentThread().isInterrupted() +
", hash=" + System.identityHashCode(this) + ']');
synchronized (mux) {
if (sock == this.sock)
this.sock = null; // Connection has dead.
}
}
catch (IgniteCheckedException e) {
log.error("DEBUG Ignite error thrown.", e); // TODO
U.error(log, "Failed to send message: " + msg, e);
U.debug(log, "8 SocketWriter thread interrupted status " +
"[status=" + Thread.currentThread().isInterrupted() +
", hash=" + System.identityHashCode(this) + ']');
msg = null;
}
}
}
}
{noformat}
> Client instance hangs on close
> ------------------------------
>
> Key: IGNITE-3412
> URL: https://issues.apache.org/jira/browse/IGNITE-3412
> Project: Ignite
> Issue Type: Bug
> Affects Versions: 1.6
> Reporter: Alexei Scherbakov
> Assignee: Alexei Scherbakov
> Fix For: 1.7
>
> Attachments: SocketsTest.zip, threadDump.txt
>
>
> In some cases calling close on Ignite client instance will lead to deadlock.
> The deadlock happens because of the following.
> Socket writer is waiting for new messages.
> {code}
> "tcp-client-disco-sock-writer-#2%null%" #100 prio=6 os_prio=0
> tid=0x000000005fad2800 nid=0x13bc in Object.wait() [0x0000000067d0e000]
> java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at
> org.apache.ignite.spi.discovery.tcp.ClientImpl$SocketWriter.body(ClientImpl.java:1051)
> - locked <0x00000000863da2f8> (a java.lang.Object)
> at org.apache.ignite.spi.IgniteSpiThread.run(IgniteSpiThread.java:62)
> {code}
> The closing process is hanging because TcpDiscoverySPI waits while this
> writer is terminated
> {code}
> "Thread-6" #29 prio=6 os_prio=0 tid=0x000000005a740000 nid=0x17e8 in
> Object.wait() [0x000000006077e000]
> java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Thread.join(Thread.java:1245)
> - locked <0x00000000863da010> (a
> org.apache.ignite.spi.discovery.tcp.ClientImpl$SocketWriter)
> at java.lang.Thread.join(Thread.java:1319)
> at
> org.apache.ignite.internal.util.IgniteUtils.join(IgniteUtils.java:4476)
> at
> org.apache.ignite.spi.discovery.tcp.ClientImpl.spiStop(ClientImpl.java:295)
> at
> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStop(TcpDiscoverySpi.java:1905)
> at
> org.apache.ignite.internal.managers.GridManagerAdapter.stopSpi(GridManagerAdapter.java:325)
> at
> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.stop(GridDiscoveryManager.java:1336)
> at org.apache.ignite.internal.IgniteKernal.stop0(IgniteKernal.java:1940)
> at org.apache.ignite.internal.IgniteKernal.stop(IgniteKernal.java:1812)
> at
> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.stop0(IgnitionEx.java:2248)
> - locked <0x00000000858e77a8> (a
> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance)
> at
> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.stop(IgnitionEx.java:2211)
> at org.apache.ignite.internal.IgnitionEx.stop(IgnitionEx.java:322)
> at org.apache.ignite.Ignition.stop(Ignition.java:224)
> at org.apache.ignite.internal.IgniteKernal.close(IgniteKernal.java:2921)
> at ru.sbrf.ggcod.loader.job.MainLoader.run(MainLoader.java:123)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> There is some raise that led to the situation when the writer is hanging on
> {{Object.wait}} method ignoring interrupted flag that was set at some point
> of time.
> The full thread dump is attached.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)