ignite-1745 GridCacheIoManager should send correct response in case of cache query error
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7bedc8ac Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7bedc8ac Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7bedc8ac Branch: refs/heads/ignite-1770 Commit: 7bedc8aceefb94d1bd87620887a8a44025518abe Parents: 3ec52f3 Author: agura <[email protected]> Authored: Tue Oct 27 18:22:42 2015 +0300 Committer: agura <[email protected]> Committed: Thu Oct 29 15:36:15 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 28 +++++++++++++++-- ...niteCacheP2pUnmarshallingQueryErrorTest.java | 32 +++++++++++++++++++- 2 files changed, 57 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7bedc8ac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index ec34f41..082f330 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -53,6 +53,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.GridSpinReadWriteLock; @@ -73,6 +75,9 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; * Cache communication manager. */ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { + /** Communication topic prefix for distributed queries. */ + private static final String QUERY_TOPIC_PREFIX = "QUERY"; + /** Message ID generator. */ private static final AtomicLong idGen = new AtomicLong(); @@ -304,8 +309,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** * Processes failed messages. * - * @param nodeId niode id. - * @param msg message. + * @param nodeId Node ID. + * @param msg Message. * @throws IgniteCheckedException If failed. */ private void processFailedMessage(UUID nodeId, GridCacheMessage msg) throws IgniteCheckedException { @@ -493,6 +498,25 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { break; + case 58: { + GridCacheQueryRequest req = (GridCacheQueryRequest)msg; + + GridCacheQueryResponse res = new GridCacheQueryResponse( + req.cacheId(), + req.id(), + req.classError(), + cctx.deploymentEnabled()); + + cctx.io().sendOrderedMessage( + ctx.node(nodeId), + TOPIC_CACHE.topic(QUERY_TOPIC_PREFIX, nodeId, req.id()), + res, + ctx.ioPolicy(), + Long.MAX_VALUE); + } + + break; + default: throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message=" + msg + "]"); http://git-wip-us.apache.org/repos/asf/ignite/blob/7bedc8ac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java index 6a4ba3a..07fa2bc 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingQueryErrorTest.java @@ -17,9 +17,13 @@ package org.apache.ignite.internal.processors.cache; +import java.io.IOException; +import java.io.ObjectInputStream; import javax.cache.CacheException; +import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiPredicate; /** * Checks behavior on exception while unmarshalling key. @@ -47,10 +51,36 @@ public class IgniteCacheP2pUnmarshallingQueryErrorTest extends IgniteCacheP2pUnm try { jcache(0).query(new SqlQuery<TestKey, String>(String.class, "field like '" + key + "'")).getAll(); - assert false : "p2p marshalling failed, but error response was not sent"; + fail("p2p marshalling failed, but error response was not sent"); } catch (CacheException e) { // No-op } } + + /** + * @throws Exception If failed. + */ + public void testResponseMessageOnRequestUnmarshallingFailed() throws Exception { + readCnt.set(Integer.MAX_VALUE); + + jcache(0).put(new TestKey(String.valueOf(++key)), ""); + + try { + jcache().query(new ScanQuery<>(new IgniteBiPredicate<TestKey, String>() { + @Override public boolean apply(TestKey key, String val) { + return false; + } + + private void readObject(ObjectInputStream is) throws IOException { + throw new IOException(); + } + })).getAll(); + + fail("Request unmarshalling failed, but error response was not sent."); + } + catch (Exception e) { + // No-op. + } + } } \ No newline at end of file
