http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java index d108b56..7af6139 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java @@ -24,6 +24,7 @@ import java.nio.ByteOrder; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.util.nio.GridNioFilter; import org.apache.ignite.internal.util.nio.GridNioFilterAdapter; @@ -36,6 +37,7 @@ import org.apache.ignite.internal.util.nio.GridNioServerListener; import org.apache.ignite.internal.util.nio.GridNioSession; import org.apache.ignite.internal.util.nio.GridNioSessionImpl; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; /** @@ -201,7 +203,10 @@ public class IpcToNioAdapter<T> { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) { + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, + Object msg, + boolean fut, + IgniteInClosure<IgniteException> ackC) { assert ses == IpcToNioAdapter.this.ses; return send((Message)msg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java index 7987d3d..f110cf2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java @@ -19,9 +19,11 @@ package org.apache.ignite.internal.util.nio; import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; /** * Verifies that first bytes received in accepted (incoming) @@ -73,9 +75,10 @@ public class GridConnectionBytesVerifyFilter extends GridNioFilterAdapter { @Override public GridNioFuture<?> onSessionWrite( GridNioSession ses, Object msg, - boolean fut + boolean fut, + IgniteInClosure<IgniteException> ackC ) throws IgniteCheckedException { - return proceedSessionWrite(ses, msg, fut); + return proceedSessionWrite(ses, msg, fut, ackC); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java index 5d90cdb..d55bc54 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java @@ -19,10 +19,12 @@ package org.apache.ignite.internal.util.nio; import java.util.concurrent.Executor; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.internal.util.worker.GridWorkerPool; +import org.apache.ignite.lang.IgniteInClosure; /** * Enables multithreaded notification of session opened, message received and session closed events. @@ -110,9 +112,10 @@ public class GridNioAsyncNotifyFilter extends GridNioFilterAdapter { @Override public GridNioFuture<?> onSessionWrite( GridNioSession ses, Object msg, - boolean fut + boolean fut, + IgniteInClosure<IgniteException> ackC ) throws IgniteCheckedException { - return proceedSessionWrite(ses, msg, fut); + return proceedSessionWrite(ses, msg, fut, ackC); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java index 343e625..b81086a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java @@ -20,10 +20,12 @@ package org.apache.ignite.internal.util.nio; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteInClosure; /** * Filter that transforms byte buffers to user-defined objects and vice-versa @@ -82,16 +84,17 @@ public class GridNioCodecFilter extends GridNioFilterAdapter { @Override public GridNioFuture<?> onSessionWrite( GridNioSession ses, Object msg, - boolean fut + boolean fut, + IgniteInClosure<IgniteException> ackC ) throws IgniteCheckedException { // No encoding needed in direct mode. if (directMode) - return proceedSessionWrite(ses, msg, fut); + return proceedSessionWrite(ses, msg, fut, ackC); try { ByteBuffer res = parser.encode(ses, msg); - return proceedSessionWrite(ses, res, fut); + return proceedSessionWrite(ses, res, fut, ackC); } catch (IOException e) { throw new GridNioException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java index be77d39..eab4909 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioEmbeddedFuture.java @@ -31,6 +31,13 @@ public class GridNioEmbeddedFuture<R> extends GridNioFutureImpl<R> { private static final long serialVersionUID = 0L; /** + * + */ + public GridNioEmbeddedFuture() { + super(null); + } + + /** * Callback to notify that future is finished. * This method must delegate to {@link #onDone(GridNioFuture, Throwable)} method. * http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java index f7928c4..9163a4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.lang.IgniteInClosure; /** * This interface defines the general element in transformation chain between the nio server and @@ -106,13 +108,15 @@ public interface GridNioFilter { * @param ses Session instance. * @param msg Message to send. * @param fut {@code True} if write future should be created. + * @param ackC Closure invoked when message ACK is received. * @return Write future or {@code null}. * @throws IgniteCheckedException If filter is not in chain or GridNioException occurred in the underlying filter. */ public GridNioFuture<?> proceedSessionWrite( GridNioSession ses, Object msg, - boolean fut + boolean fut, + IgniteInClosure<IgniteException> ackC ) throws IgniteCheckedException; /** @@ -155,10 +159,14 @@ public interface GridNioFilter { * @param ses Session on which message should be written. * @param msg Message being written. * @param fut {@code True} if write future should be created. + * @param ackC Closure invoked when message ACK is received. * @return Write future or {@code null}. * @throws GridNioException If GridNioException occurred while handling event. */ - public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException; + public GridNioFuture<?> onSessionWrite(GridNioSession ses, + Object msg, + boolean fut, + IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException; /** * Invoked when a new messages received. http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java index 58ddae5..4ede4b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.lang.IgniteInClosure; /** * Class that defines the piece for application-to-network and vice-versa data conversions @@ -111,11 +113,12 @@ public abstract class GridNioFilterAdapter implements GridNioFilter { @Override public GridNioFuture<?> proceedSessionWrite( GridNioSession ses, Object msg, - boolean fut + boolean fut, + IgniteInClosure<IgniteException> ackC ) throws IgniteCheckedException { checkNext(); - return nextFilter.onSessionWrite(ses, msg, fut); + return nextFilter.onSessionWrite(ses, msg, fut, ackC); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java index 8cc690b..ec59020 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java @@ -18,8 +18,10 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.lang.IgniteInClosure; /** * Filter chain implementation for nio server filters. @@ -184,9 +186,10 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter { @Override public GridNioFuture<?> onSessionWrite( GridNioSession ses, Object msg, - boolean fut + boolean fut, + IgniteInClosure<IgniteException> ackC ) throws IgniteCheckedException { - return tail.onSessionWrite(ses, msg, fut); + return tail.onSessionWrite(ses, msg, fut, ackC); } /** @@ -259,9 +262,11 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) - throws IgniteCheckedException { - return proceedSessionWrite(ses, msg, fut); + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, + Object msg, + boolean fut, + IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { + return proceedSessionWrite(ses, msg, fut, ackC); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java index 3d18ab7..2835a22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java @@ -59,11 +59,6 @@ public class GridNioFinishedFuture<R> extends GridFinishedFuture<R> implements G } /** {@inheritDoc} */ - @Override public void ackClosure(IgniteInClosure<IgniteException> closure) { - // No-op. - } - - /** {@inheritDoc} */ @Override public void onAckReceived() { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java index 6c0c9c6..4d1fee6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java @@ -43,13 +43,6 @@ public interface GridNioFuture<R> extends IgniteInternalFuture<R> { public boolean skipRecovery(); /** - * Sets ack closure which will be applied when ack received. - * - * @param c Ack closure. - */ - public void ackClosure(IgniteInClosure<IgniteException> c); - - /** * The method will be called when ack received. */ public void onAckReceived(); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java index fe97039..6a94a54 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java @@ -30,10 +30,17 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi private static final long serialVersionUID = 0L; /** */ - protected boolean msgThread; + private boolean msgThread; /** */ - protected IgniteInClosure<IgniteException> ackClosure; + protected final IgniteInClosure<IgniteException> ackC; + + /** + * @param ackC Ack closure. + */ + public GridNioFutureImpl(IgniteInClosure<IgniteException> ackC) { + this.ackC = ackC; + } /** {@inheritDoc} */ @Override public void messageThread(boolean msgThread) { @@ -51,18 +58,13 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi } /** {@inheritDoc} */ - @Override public void ackClosure(IgniteInClosure<IgniteException> closure) { - ackClosure = closure; - } - - /** {@inheritDoc} */ @Override public void onAckReceived() { // No-op. } /** {@inheritDoc} */ @Override public IgniteInClosure<IgniteException> ackClosure() { - return ackClosure; + return ackC; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index fefdf15..7f25e40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -74,7 +74,6 @@ import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import sun.nio.ch.DirectBuffer; -import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.ACK_CLOSURE; import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MSG_WRITER; import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.NIO_OPERATION; @@ -481,22 +480,26 @@ public class GridNioServer<T> { * @param ses Session. * @param msg Message. * @param createFut {@code True} if future should be created. + * @param ackC Closure invoked when message ACK is received. * @return Future for operation. */ - GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg, boolean createFut) throws IgniteCheckedException { + GridNioFuture<?> send(GridNioSession ses, + ByteBuffer msg, + boolean createFut, + IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { assert ses instanceof GridSelectorNioSessionImpl : ses; GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; if (createFut) { - NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg); + NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg, ackC); send0(impl, fut, false); return fut; } else { - SessionWriteRequest req = new WriteRequestImpl(ses, msg, true); + SessionWriteRequest req = new WriteRequestImpl(ses, msg, true, ackC); send0(impl, req, false); @@ -508,23 +511,27 @@ public class GridNioServer<T> { * @param ses Session. * @param msg Message. * @param createFut {@code True} if future should be created. + * @param ackC Closure invoked when message ACK is received. * @return Future for operation. */ - GridNioFuture<?> send(GridNioSession ses, Message msg, boolean createFut) throws IgniteCheckedException { + GridNioFuture<?> send(GridNioSession ses, + Message msg, + boolean createFut, + IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { assert ses instanceof GridSelectorNioSessionImpl; GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; if (createFut) { NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg, - skipRecoveryPred.apply(msg)); + skipRecoveryPred.apply(msg), ackC); send0(impl, fut, false); return fut; } else { - SessionWriteRequest req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg)); + SessionWriteRequest req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg), ackC); send0(impl, req, false); @@ -544,11 +551,6 @@ public class GridNioServer<T> { int msgCnt = sys ? ses.offerSystemFuture(req) : ses.offerFuture(req); - IgniteInClosure<IgniteException> ackC; - - if (!sys && (ackC = ses.removeMeta(ACK_CLOSURE.ordinal())) != null) - req.ackClosure(ackC); - if (ses.closed()) { if (ses.removeFuture(req)) { IOException err = new IOException("Failed to send message (connection was closed): " + ses); @@ -597,8 +599,11 @@ public class GridNioServer<T> { GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; if (lsnr != null) { - NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg, - skipRecoveryPred.apply(msg)); + NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, + NioOperation.REQUIRE_WRITE, + msg, + skipRecoveryPred.apply(msg), + null); fut.listen(lsnr); @@ -2597,11 +2602,6 @@ public class GridNioServer<T> { } /** {@inheritDoc} */ - @Override public void ackClosure(IgniteInClosure<IgniteException> c) { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ @Override public void onAckReceived() { throw new UnsupportedOperationException(); } @@ -2664,17 +2664,22 @@ public class GridNioServer<T> { private final boolean skipRecovery; /** */ - private IgniteInClosure<IgniteException> ackC; + private final IgniteInClosure<IgniteException> ackC; /** * @param ses Session. * @param msg Message. * @param skipRecovery Skip recovery flag. + * @param ackC Closure invoked when message ACK is received. */ - WriteRequestImpl(GridNioSession ses, Object msg, boolean skipRecovery) { + WriteRequestImpl(GridNioSession ses, + Object msg, + boolean skipRecovery, + IgniteInClosure<IgniteException> ackC) { this.ses = ses; this.msg = msg; this.skipRecovery = skipRecovery; + this.ackC = ackC; } /** {@inheritDoc} */ @@ -2693,11 +2698,6 @@ public class GridNioServer<T> { } /** {@inheritDoc} */ - @Override public void ackClosure(IgniteInClosure<IgniteException> c) { - ackC = c; - } - - /** {@inheritDoc} */ @Override public void onAckReceived() { assert msg instanceof Message; @@ -2798,6 +2798,8 @@ public class GridNioServer<T> { boolean accepted, @Nullable Map<Integer, ?> meta ) { + super(null); + op = NioOperation.REGISTER; this.sockCh = sockCh; @@ -2812,6 +2814,8 @@ public class GridNioServer<T> { * @param op Requested operation. */ NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op) { + super(null); + assert ses != null || op == NioOperation.DUMP_STATS : "Invalid params [ses=" + ses + ", op=" + op + ']'; assert op != null; assert op != NioOperation.REGISTER; @@ -2826,8 +2830,14 @@ public class GridNioServer<T> { * @param ses Session to change. * @param op Requested operation. * @param msg Message. + * @param ackC Closure invoked when message ACK is received. */ - NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, Object msg) { + NioOperationFuture(GridSelectorNioSessionImpl ses, + NioOperation op, + Object msg, + IgniteInClosure<IgniteException> ackC) { + super(ackC); + assert ses != null; assert op != null; assert op != NioOperation.REGISTER; @@ -2845,9 +2855,15 @@ public class GridNioServer<T> { * @param op Requested operation. * @param commMsg Direct message. * @param skipRecovery Skip recovery flag. + * @param ackC Closure invoked when message ACK is received. */ - NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op, - Message commMsg, boolean skipRecovery) { + NioOperationFuture(GridSelectorNioSessionImpl ses, + NioOperation op, + Message commMsg, + boolean skipRecovery, + IgniteInClosure<IgniteException> ackC) { + super(ackC); + assert ses != null; assert op != null; assert op != NioOperation.REGISTER; @@ -3013,7 +3029,10 @@ public class GridNioServer<T> { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException { + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, + Object msg, + boolean fut, + IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { if (directMode) { boolean sslSys = sslFilter != null && msg instanceof ByteBuffer; @@ -3032,10 +3051,10 @@ public class GridNioServer<T> { return null; } else - return send(ses, (Message)msg, fut); + return send(ses, (Message)msg, fut, ackC); } else - return send(ses, (ByteBuffer)msg, fut); + return send(ses, (ByteBuffer)msg, fut, ackC); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java index c1b60ab..21eabf3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java @@ -19,6 +19,8 @@ package org.apache.ignite.internal.util.nio; import java.net.InetSocketAddress; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; /** @@ -107,8 +109,11 @@ public interface GridNioSession { /** * @param msg Message to be sent. + * @param ackC Optional closure invoked when ack for message is received. + * @throws IgniteCheckedException If failed. */ - public void sendNoFuture(Object msg) throws IgniteCheckedException; + public void sendNoFuture(Object msg, @Nullable IgniteInClosure<IgniteException> ackC) + throws IgniteCheckedException; /** * Gets metadata associated with specified key. http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java index 7424531..98a22d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java @@ -20,8 +20,10 @@ package org.apache.ignite.internal.util.nio; import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MAX_KEYS_CNT; @@ -105,7 +107,7 @@ public class GridNioSessionImpl implements GridNioSession { try { resetSendScheduleTime(); - return chain().onSessionWrite(this, msg, true); + return chain().onSessionWrite(this, msg, true, null); } catch (IgniteCheckedException e) { close(); @@ -115,9 +117,10 @@ public class GridNioSessionImpl implements GridNioSession { } /** {@inheritDoc} */ - @Override public void sendNoFuture(Object msg) throws IgniteCheckedException { + @Override public void sendNoFuture(Object msg, IgniteInClosure<IgniteException> ackC) + throws IgniteCheckedException { try { - chain().onSessionWrite(this, msg, false); + chain().onSessionWrite(this, msg, false, ackC); } catch (IgniteCheckedException e) { close(); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java index bdb3a29..5385430 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java @@ -42,10 +42,7 @@ public enum GridNioSessionMetaKey { MARSHALLER_ID, /** Message writer. */ - MSG_WRITER, - - /** Ack closure. */ - ACK_CLOSURE; + MSG_WRITER; /** Maximum count of NIO session keys in system. */ public static final int MAX_KEYS_CNT = 64; http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java index d941bae..ab9b2eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -123,7 +123,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien /** {@inheritDoc} */ @Override public synchronized boolean sendMessage(UUID nodeId, Message msg, - IgniteInClosure<IgniteException> closure) throws IgniteCheckedException { + IgniteInClosure<IgniteException> c) throws IgniteCheckedException { assert nodeId != null; if (closed()) @@ -142,8 +142,8 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien markUsed(); - if (closure != null) - closure.apply(null); + if (c != null) + c.apply(null); return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java index 3397772..eff893f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java @@ -32,8 +32,6 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.ACK_CLOSURE; - /** * Grid client for NIO server. */ @@ -115,18 +113,9 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie // Node ID is never provided in asynchronous send mode. assert nodeId == null; - if (c != null) - ses.addMeta(ACK_CLOSURE.ordinal(), c); - - ses.sendNoFuture(msg); - - if (c != null) - ses.removeMeta(ACK_CLOSURE.ordinal()); + ses.sendNoFuture(msg, c); } catch (IgniteCheckedException e) { - if (c != null) - ses.removeMeta(ACK_CLOSURE.ordinal()); - if (log.isDebugEnabled()) log.debug("Failed to send message [client=" + this + ", err=" + e + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java index 508c791..e24f3ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java @@ -42,13 +42,6 @@ public interface SessionWriteRequest { public boolean skipRecovery(); /** - * Sets ack closure which will be applied when ack received. - * - * @param c Ack closure. - */ - public void ackClosure(IgniteInClosure<IgniteException> c); - - /** * The method will be called when ack received. */ public void onAckReceived(); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java index 8ed7db0..b4bd34a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.util.nio.GridNioFutureImpl; import org.apache.ignite.internal.util.nio.GridNioSession; import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META; @@ -285,10 +286,11 @@ public class GridNioSslFilter extends GridNioFilterAdapter { @Override public GridNioFuture<?> onSessionWrite( GridNioSession ses, Object msg, - boolean fut + boolean fut, + IgniteInClosure<IgniteException> ackC ) throws IgniteCheckedException { if (directMode) - return proceedSessionWrite(ses, msg, fut); + return proceedSessionWrite(ses, msg, fut, ackC); ByteBuffer input = checkMessage(ses, msg); @@ -307,13 +309,13 @@ public class GridNioSslFilter extends GridNioFilterAdapter { if (hnd.isHandshakeFinished()) { hnd.encrypt(input); - return hnd.writeNetBuffer(); + return hnd.writeNetBuffer(ackC); } else { if (log.isDebugEnabled()) log.debug("Write request received during handshake, scheduling deferred write: " + ses); - return hnd.deferredWrite(input); + return hnd.deferredWrite(input, ackC); } } catch (SSLException e) { @@ -390,7 +392,7 @@ public class GridNioSslFilter extends GridNioFilterAdapter { try { hnd.closeOutbound(); - hnd.writeNetBuffer(); + hnd.writeNetBuffer(null); } catch (SSLException e) { U.warn(log, "Failed to shutdown SSL session gracefully (will force close) [ex=" + e + ", ses=" + ses + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java index 269e8b9..e268716 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java @@ -27,6 +27,7 @@ import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLException; import javax.net.ssl.SSLSession; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.util.nio.GridNioEmbeddedFuture; import org.apache.ignite.internal.util.nio.GridNioException; @@ -34,6 +35,7 @@ import org.apache.ignite.internal.util.nio.GridNioFuture; import org.apache.ignite.internal.util.nio.GridNioFutureImpl; import org.apache.ignite.internal.util.nio.GridNioSession; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import static javax.net.ssl.SSLEngineResult.HandshakeStatus; import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED; @@ -274,7 +276,7 @@ class GridNioSslHandler extends ReentrantLock { log.debug("Wrapped handshake data [status=" + res.getStatus() + ", handshakeStatus=" + handshakeStatus + ", ses=" + ses + ']'); - writeNetBuffer(); + writeNetBuffer(null); break; } @@ -412,16 +414,17 @@ class GridNioSslHandler extends ReentrantLock { * Adds write request to the queue. * * @param buf Buffer to write. + * @param ackC Closure invoked when message ACK is received. * @return Write future. */ - GridNioFuture<?> deferredWrite(ByteBuffer buf) { + GridNioFuture<?> deferredWrite(ByteBuffer buf, IgniteInClosure<IgniteException> ackC) { assert isHeldByCurrentThread(); GridNioEmbeddedFuture<Object> fut = new GridNioEmbeddedFuture<>(); ByteBuffer cp = copy(buf); - deferredWriteQueue.offer(new WriteRequest(fut, cp)); + deferredWriteQueue.offer(new WriteRequest(fut, cp, ackC)); return fut; } @@ -437,7 +440,7 @@ class GridNioSslHandler extends ReentrantLock { while (!deferredWriteQueue.isEmpty()) { WriteRequest req = deferredWriteQueue.poll(); - req.future().onDone((GridNioFuture<Object>)parent.proceedSessionWrite(ses, req.buffer(), true)); + req.future().onDone((GridNioFuture<Object>)parent.proceedSessionWrite(ses, req.buffer(), true, req.ackC)); } } @@ -475,14 +478,15 @@ class GridNioSslHandler extends ReentrantLock { * Copies data from out net buffer and passes it to the underlying chain. * * @return Write future. + * @param ackC Closure invoked when message ACK is received. * @throws GridNioException If send failed. */ - GridNioFuture<?> writeNetBuffer() throws IgniteCheckedException { + GridNioFuture<?> writeNetBuffer(IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException { assert isHeldByCurrentThread(); ByteBuffer cp = copy(outNetBuf); - return parent.proceedSessionWrite(ses, cp, true); + return parent.proceedSessionWrite(ses, cp, true, ackC); } /** @@ -670,20 +674,27 @@ class GridNioSslHandler extends ReentrantLock { */ private static class WriteRequest { /** Future that should be completed. */ - private GridNioEmbeddedFuture<Object> fut; + private final GridNioEmbeddedFuture<Object> fut; /** Buffer needed to be written. */ - private ByteBuffer buf; + private final ByteBuffer buf; + + /** */ + private final IgniteInClosure<IgniteException> ackC; /** * Creates write request. * * @param fut Future. * @param buf Buffer to write. + * @param ackC Closure invoked when message ACK is received. */ - private WriteRequest(GridNioEmbeddedFuture<Object> fut, ByteBuffer buf) { + private WriteRequest(GridNioEmbeddedFuture<Object> fut, + ByteBuffer buf, + IgniteInClosure<IgniteException> ackC) { this.fut = fut; this.buf = buf; + this.ackC = ackC; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 35568c3..fe915e5 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -369,7 +369,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug("Sending local node ID to newly accepted session: " + ses); try { - ses.sendNoFuture(nodeIdMessage()); + ses.sendNoFuture(nodeIdMessage(), null); } catch (IgniteCheckedException e) { U.error(log, "Failed to send message: " + e, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java index 8d88876..6005ac9 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java @@ -46,19 +46,19 @@ public class IgniteThread extends Thread { /** The name of the Ignite instance this thread belongs to. */ protected final String igniteInstanceName; - /** Group index. */ - private final int grpIdx; - /** */ private int compositeRwLockIdx; + /** */ + private final int stripe; + /** * Creates thread with given worker. * * @param worker Runnable to create thread with. */ public IgniteThread(GridWorker worker) { - this(DFLT_GRP, worker.igniteInstanceName(), worker.name(), worker, GRP_IDX_UNASSIGNED); + this(DFLT_GRP, worker.igniteInstanceName(), worker.name(), worker, GRP_IDX_UNASSIGNED, -1); } /** @@ -69,7 +69,7 @@ public class IgniteThread extends Thread { * @param r Runnable to execute. */ public IgniteThread(String igniteInstanceName, String threadName, Runnable r) { - this(igniteInstanceName, threadName, r, GRP_IDX_UNASSIGNED); + this(igniteInstanceName, threadName, r, GRP_IDX_UNASSIGNED, -1); } /** @@ -79,9 +79,10 @@ public class IgniteThread extends Thread { * @param threadName Name of thread. * @param r Runnable to execute. * @param grpIdx Index within a group. + * @param stripe Non-negative stripe number if this thread is striped pool thread. */ - public IgniteThread(String igniteInstanceName, String threadName, Runnable r, int grpIdx) { - this(DFLT_GRP, igniteInstanceName, threadName, r, grpIdx); + public IgniteThread(String igniteInstanceName, String threadName, Runnable r, int grpIdx, int stripe) { + this(DFLT_GRP, igniteInstanceName, threadName, r, grpIdx, stripe); } /** @@ -93,14 +94,16 @@ public class IgniteThread extends Thread { * @param threadName Name of thread. * @param r Runnable to execute. * @param grpIdx Thread index within a group. + * @param stripe Non-negative stripe number if this thread is striped pool thread. */ - public IgniteThread(ThreadGroup grp, String igniteInstanceName, String threadName, Runnable r, int grpIdx) { + public IgniteThread(ThreadGroup grp, String igniteInstanceName, String threadName, Runnable r, int grpIdx, int stripe) { super(grp, r, createName(cntr.incrementAndGet(), threadName, igniteInstanceName)); A.ensure(grpIdx >= -1, "grpIdx >= -1"); this.igniteInstanceName = igniteInstanceName; - this.grpIdx = compositeRwLockIdx = grpIdx; + this.compositeRwLockIdx = grpIdx; + this.stripe = stripe; } /** @@ -112,18 +115,15 @@ public class IgniteThread extends Thread { super(threadGrp, threadName); this.igniteInstanceName = igniteInstanceName; - this.grpIdx = compositeRwLockIdx = GRP_IDX_UNASSIGNED; + this.compositeRwLockIdx = GRP_IDX_UNASSIGNED; + this.stripe = -1; } /** - * Gets name of the grid this thread belongs to. - * - * @return Name of the grid this thread belongs to. - * @deprecated use {@link #getIgniteInstanceName()} + * @return Non-negative stripe number if this thread is striped pool thread. */ - @Deprecated - public String getGridName() { - return getIgniteInstanceName(); + public int stripe() { + return stripe; } /** @@ -136,13 +136,6 @@ public class IgniteThread extends Thread { } /** - * @return Group index. - */ - public int groupIndex() { - return grpIdx; - } - - /** * @return Composite RW lock index. */ public int compositeRwLockIndex() { http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java index 119ef70..d2f0b15 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java @@ -61,7 +61,7 @@ public class IgniteThreadFactory implements ThreadFactory { /** {@inheritDoc} */ @Override public Thread newThread(@NotNull Runnable r) { - return new IgniteThread(igniteInstanceName, threadName, r, idxGen.incrementAndGet()); + return new IgniteThread(igniteInstanceName, threadName, r, idxGen.incrementAndGet(), -1); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java index da2923f..8cbb596 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java @@ -47,7 +47,7 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest private static final int SAMPLE_CNT = 1; /** */ - private static final byte DIRECT_TYPE = (byte)210; + private static final byte DIRECT_TYPE = -127; /** */ private int bufSize; http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java index 7c0e485..a158f7a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java @@ -40,6 +40,13 @@ public class CacheRebalancingSelfTest extends GridCommonAbstractTest { return cfg; } + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + /** * @throws Exception If failed. */ @@ -63,13 +70,12 @@ public class CacheRebalancingSelfTest extends GridCommonAbstractTest { } /** - * @param future Future. + * @param fut Future. * @return Internal future. */ - private static IgniteInternalFuture internalFuture(IgniteFuture future) { - assert future instanceof IgniteFutureImpl; + private static IgniteInternalFuture internalFuture(IgniteFuture fut) { + assert fut instanceof IgniteFutureImpl : fut; - return ((IgniteFutureImpl)future).internalFuture(); + return ((IgniteFutureImpl) fut).internalFuture(); } - } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java index 52c0ac5..09a0d9a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java @@ -78,8 +78,6 @@ public abstract class GridCacheAbstractFailoverSelfTest extends GridCacheAbstrac cfg.setNetworkTimeout(60_000); - cfg.getTransactionConfiguration().setTxSerializableEnabled(true); - TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); discoSpi.setSocketTimeout(30_000); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java index 76cf78c..a12b6b9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java @@ -94,13 +94,6 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest /** * @throws Exception If failed. */ - public void testPartitionedClock() throws Exception { - checkMessages(false, CLOCK); - } - - /** - * @throws Exception If failed. - */ public void testPartitionedPrimary() throws Exception { checkMessages(false, PRIMARY); } @@ -108,13 +101,6 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest /** * @throws Exception If failed. */ - public void testClientClock() throws Exception { - checkMessages(true, CLOCK); - } - - /** - * @throws Exception If failed. - */ public void testClientPrimary() throws Exception { checkMessages(true, PRIMARY); } @@ -206,14 +192,14 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest private Map<Class<?>, AtomicInteger> cntMap = new HashMap<>(); /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { AtomicInteger cntr = cntMap.get(((GridIoMessage)msg).message().getClass()); if (cntr != null) cntr.incrementAndGet(); - super.sendMessage(node, msg, ackClosure); + super.sendMessage(node, msg, ackC); } /** @@ -221,7 +207,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest * * @param cls Class to count. */ - public void registerMessage(Class<?> cls) { + void registerMessage(Class<?> cls) { AtomicInteger cntr = cntMap.get(cls); if (cntr == null) @@ -232,7 +218,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest * @param cls Message type to get count. * @return Number of messages of given class. */ - public int messageCount(Class<?> cls) { + int messageCount(Class<?> cls) { AtomicInteger cntr = cntMap.get(cls); return cntr == null ? 0 : cntr.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java index ba37974..dd27d72 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java @@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; import org.apache.ignite.testframework.GridTestUtils; import org.jetbrains.annotations.Nullable; http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java index 728bf13..a44e49e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -401,9 +401,6 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac assertEquals(3, msgs.size()); - for (Object msg : msgs) - assertTrue(((GridNearAtomicFullUpdateRequest)msg).clientRequest()); - map.put(primaryKey(ignite0.cache(null)), 3); map.put(primaryKey(ignite1.cache(null)), 4); map.put(primaryKey(ignite2.cache(null)), 5); @@ -1693,8 +1690,6 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac * @throws Exception If failed. */ public void testAtomicPrimaryPutAllMultinode() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1685"); - multinode(PRIMARY, TestType.PUT_ALL); } @@ -1702,8 +1697,6 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac * @throws Exception If failed. */ public void testAtomicClockPutAllMultinode() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1685"); - multinode(CLOCK, TestType.PUT_ALL); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java index cb1f6fb4..1d2cd2d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java @@ -103,7 +103,7 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA for (int i = 0; i < GRID_CNT; i++) { final IgniteKernal grid = (IgniteKernal)grid(i); - GridTestUtils.retryAssert(log, 10, 100, new CA() { + GridTestUtils.retryAssert(log, 10, 500, new CA() { @Override public void apply() { assertTrue(grid.internalCache().context().mvcc().atomicFutures().isEmpty()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java index 5050300..2600e7b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java @@ -39,9 +39,11 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; @@ -702,8 +704,12 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ @SuppressWarnings("ConstantConditions") - private void checkEntry(Ignite ignite, Integer key, @Nullable Integer val, boolean expectNear, UUID... expReaders) - throws Exception { + private void checkEntry(Ignite ignite, + Integer key, + @Nullable Integer val, + boolean expectNear, + final UUID... expReaders) throws Exception + { GridCacheAdapter<Integer, Integer> near = ((IgniteKernal) ignite).internalCache(); assertTrue(near.isNear()); @@ -728,11 +734,22 @@ public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest { GridDhtCacheAdapter<Integer, Integer> dht = ((GridNearCacheAdapter<Integer, Integer>)near).dht(); - GridDhtCacheEntry dhtEntry = (GridDhtCacheEntry)dht.peekEx(key); + final GridDhtCacheEntry dhtEntry = (GridDhtCacheEntry)dht.peekEx(key); if (expectDht) { assertNotNull("No dht entry for: " + key + ", grid: " + ignite.name(), dhtEntry); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + return dhtEntry.readers().size() == expReaders.length; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + }, 5000); + Collection<UUID> readers = dhtEntry.readers(); assertEquals(expReaders.length, readers.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java index 3942e35..2971f81 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java @@ -43,10 +43,10 @@ import org.apache.ignite.configuration.AtomicConfiguration; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; @@ -152,7 +152,12 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst @Override protected void afterTest() throws Exception { super.afterTest(); - ignite(0).destroyCache(null); + try { + checkInternalCleanup(); + } + finally { + ignite(0).destroyCache(null); + } } /** @@ -456,22 +461,40 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst for (int i = 0; i < keysCnt; i++) assertEquals((Integer)iter, cache.get(i)); + } + + /** + * @throws Exception If failed. + */ + private void checkInternalCleanup() throws Exception{ + checkNoAtomicFutures(); + + checkOnePhaseCommitReturnValuesCleaned(); + } + /** + * @throws Exception If failed. + */ + void checkNoAtomicFutures() throws Exception { for (int i = 0; i < GRID_CNT; i++) { - IgniteKernal ignite = (IgniteKernal)grid(i); + final IgniteKernal ignite = (IgniteKernal)grid(i); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return ignite.context().cache().context().mvcc().atomicFuturesCount() == 0; + } + }, 5_000); Collection<?> futs = ignite.context().cache().context().mvcc().atomicFutures(); assertTrue("Unexpected atomic futures: " + futs, futs.isEmpty()); } - - checkOnePhaseCommitReturnValuesCleaned(); } /** - * + * @throws Exception If failed. */ - protected void checkOnePhaseCommitReturnValuesCleaned() throws IgniteInterruptedCheckedException { + void checkOnePhaseCommitReturnValuesCleaned() throws Exception { U.sleep(DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT); for (int i = 0; i < GRID_CNT; i++) { http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java index cc5f548..7460828 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java @@ -208,7 +208,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr } /** - * + * @throws Exception If failed. */ public void testOriginatingNodeFailureForcesOnePhaseCommitDataCleanup() throws Exception { ignite(0).createCache(cacheConfiguration(TestMemoryMode.HEAP, false));
