Repository: ignite Updated Branches: refs/heads/master 08c563a14 -> 525a77547
IGNITE-9141: SQL: pass error message from mapper to reducer in case of mapping failure. This closes #4536. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/525a7754 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/525a7754 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/525a7754 Branch: refs/heads/master Commit: 525a77547d7f98b1a88ba98f60f328da3aa4947a Parents: 08c563a Author: SGrimstad <[email protected]> Authored: Tue Sep 4 11:57:07 2018 +0300 Committer: devozerov <[email protected]> Committed: Tue Sep 4 11:57:07 2018 +0300 ---------------------------------------------------------------------- .../messages/GridQueryNextPageResponse.java | 35 +- .../query/h2/twostep/GridMapQueryExecutor.java | 131 +++--- .../h2/twostep/GridReduceQueryExecutor.java | 75 ++-- .../query/h2/twostep/GridResultPage.java | 2 - .../query/h2/twostep/ReduceQueryRun.java | 117 +++++- ...sappearedCacheCauseRetryMessageSelfTest.java | 134 ++++++ ...appearedCacheWasNotFoundMessageSelfTest.java | 123 ++++++ .../query/h2/twostep/JoinSqlTestHelper.java | 163 ++++++++ .../NonCollocatedRetryMessageSelfTest.java | 146 +++++++ .../h2/twostep/RetryCauseMessageSelfTest.java | 416 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite2.java | 9 + 11 files changed, 1236 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java index 2d1ec36..6b976c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java @@ -67,6 +67,9 @@ public class GridQueryNextPageResponse implements Message { /** */ private AffinityTopologyVersion retry; + /** Retry cause description*/ + private String retryCause; + /** Last page flag. */ private boolean last; @@ -235,6 +238,12 @@ public class GridQueryNextPageResponse implements Message { writer.incrementState(); case 9: + if (!writer.writeString("retryCause", retryCause)) + return false; + + writer.incrementState(); + + case 10: if (!writer.writeBoolean("removeMapping", removeMapping)) return false; @@ -323,13 +332,23 @@ public class GridQueryNextPageResponse implements Message { return false; reader.incrementState(); + case 9: + retryCause = reader.readString("retryCause"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 10: removeMapping = reader.readBoolean("removeMapping"); if (!reader.isLastRead()) return false; reader.incrementState(); + } return reader.afterMessageRead(GridQueryNextPageResponse.class); @@ -342,7 +361,7 @@ public class GridQueryNextPageResponse implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 10; + return 11; } /** @@ -360,6 +379,20 @@ public class GridQueryNextPageResponse implements Message { } /** + * @return Retry Ccause message. + */ + public String retryCause() { + return retryCause; + } + + /** + * @param retryCause Retry Ccause message. + */ + public void retryCause(String retryCause){ + this.retryCause = retryCause; + } + + /** * @return Last page flag. */ public boolean last() { http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 2402247..b4d6f00 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -312,10 +312,10 @@ public class GridMapQueryExecutor { * @param reserved Reserved list. * @param nodeId Node ID. * @param reqId Request ID. - * @return {@code true} If all the needed partitions successfully reserved. + * @return String which is null in case of success or with causeMessage if failed * @throws IgniteCheckedException If failed. */ - private boolean reservePartitions( + private String reservePartitions( @Nullable List<Integer> cacheIds, AffinityTopologyVersion topVer, final int[] explicitParts, @@ -326,7 +326,7 @@ public class GridMapQueryExecutor { assert topVer != null; if (F.isEmpty(cacheIds)) - return true; + return null; Collection<Integer> partIds = wrap(explicitParts); @@ -335,11 +335,11 @@ public class GridMapQueryExecutor { // Cache was not found, probably was not deployed yet. if (cctx == null) { - logRetry("Failed to reserve partitions for query (cache is not found on local node) [" + - "rmtNodeId=" + nodeId + ", reqId=" + reqId + ", affTopVer=" + topVer + ", cacheId=" + - cacheIds.get(i) + "]"); + final String res = String.format("Failed to reserve partitions for query (cache is not found on " + + "local node) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s]", + ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i)); - return false; + return res; } if (cctx.isLocal() || !cctx.rebalanceEnabled()) @@ -352,13 +352,10 @@ public class GridMapQueryExecutor { if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits. if (r != MapReplicatedReservation.INSTANCE) { - if (!r.reserve()) { - logRetry("Failed to reserve partitions for query (group reservation failed) [" + - "rmtNodeId=" + nodeId + ", reqId=" + reqId + ", affTopVer=" + topVer + - ", cacheId=" + cacheIds.get(i) + ", cacheName=" + cctx.name() + "]"); - - return false; // We need explicit partitions here -> retry. - } + if (!r.reserve()) + return String.format("Failed to reserve partitions for query (group " + + "reservation failed) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " + + "cacheName=%s]",ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i), cctx.name()); reserved.add(r); } @@ -374,15 +371,21 @@ public class GridMapQueryExecutor { // We don't need to reserve partitions because they will not be evicted in replicated caches. GridDhtPartitionState partState = part != null ? part.state() : null; - if (partState != OWNING) { - logRetry("Failed to reserve partitions for query (partition of " + - "REPLICATED cache is not in OWNING state) [rmtNodeId=" + nodeId + - ", reqId=" + reqId + ", affTopVer=" + topVer + ", cacheId=" + cacheIds.get(i) + - ", cacheName=" + cctx.name() + ", part=" + p + ", partFound=" + (part != null) + - ", partState=" + partState + "]"); - - return false; - } + if (partState != OWNING) + return String.format("Failed to reserve partitions for query " + + "(partition of REPLICATED cache is not in OWNING state) [" + + "localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, cacheName=%s, " + + "part=%s, partFound=%s, partState=%s]", + ctx.localNodeId(), + nodeId, + reqId, + topVer, + cacheIds.get(i), + cctx.name(), + p, + (part != null), + partState + ); } // Mark that we checked this replicated cache. @@ -398,29 +401,41 @@ public class GridMapQueryExecutor { GridDhtPartitionState partState = part != null ? part.state() : null; - if (partState != OWNING || !part.reserve()) { - logRetry("Failed to reserve partitions for query (partition of " + - "PARTITIONED cache cannot be reserved) [rmtNodeId=" + nodeId + ", reqId=" + reqId + - ", affTopVer=" + topVer + ", cacheId=" + cacheIds.get(i) + - ", cacheName=" + cctx.name() + ", part=" + partId + ", partFound=" + (part != null) + - ", partState=" + partState + "]"); - - return false; - } + if (partState != OWNING || !part.reserve()) + return String.format("Failed to reserve partitions for query " + + "(partition of PARTITIONED cache cannot be reserved) [" + + "localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, cacheName=%s, " + + "part=%s, partFound=%s, partState=%s]", + ctx.localNodeId(), + nodeId, + reqId, + topVer, + cacheIds.get(i), + cctx.name(), + partId, + (part != null), + partState + ); reserved.add(part); // Double check that we are still in owning state and partition contents are not cleared. partState = part.state(); - if (part.state() != OWNING) { - logRetry("Failed to reserve partitions for query (partition of " + - "PARTITIONED cache is not in OWNING state after reservation) [rmtNodeId=" + nodeId + - ", reqId=" + reqId + ", affTopVer=" + topVer + ", cacheId=" + cacheIds.get(i) + - ", cacheName=" + cctx.name() + ", part=" + partId + ", partState=" + partState + "]"); - - return false; - } + if (part.state() != OWNING) + return String.format("Failed to reserve partitions for query " + + "(partition of PARTITIONED cache is not in OWNING state after reservation) [" + + "localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, cacheName=%s, " + + "part=%s, partState=%s]", + ctx.localNodeId(), + nodeId, + reqId, + topVer, + cacheIds.get(i), + cctx.name(), + partId, + partState + ); } if (explicitParts == null) { @@ -442,16 +457,7 @@ public class GridMapQueryExecutor { } } - return true; - } - - /** - * Load failed partition reservation. - * - * @param msg Message. - */ - private void logRetry(String msg) { - log.info(msg); + return null; } /** @@ -783,12 +789,14 @@ public class GridMapQueryExecutor { // otherwise, their state is protected by locked topology. if (topVer != null && txDetails == null) { // Reserve primary for topology version or explicit partitions. - if (!reservePartitions(cacheIds, topVer, parts, reserved, node.id(), reqId)) { + String err = reservePartitions(cacheIds, topVer, parts, reserved, node.id(), reqId); + + if (!F.isEmpty(err)) { // Unregister lazy worker because re-try may never reach this node again. if (lazy) stopAndUnregisterCurrentLazyWorker(); - sendRetry(node, reqId, segmentId); + sendRetry(node, reqId, segmentId, err); return; } @@ -975,10 +983,12 @@ public class GridMapQueryExecutor { GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class); if (retryErr != null) { - logRetry("Failed to execute non-collocated query (will retry) [nodeId=" + node.id() + - ", reqId=" + reqId + ", errMsg=" + retryErr.getMessage() + ']'); + final String retryCause = String.format( + "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " + + "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage() + ); - sendRetry(node, reqId, segmentId); + sendRetry(node, reqId, segmentId, retryCause); } else { U.error(log, "Failed to execute local query.", e); @@ -1035,13 +1045,15 @@ public class GridMapQueryExecutor { List<GridReservable> reserved = new ArrayList<>(); - if (!reservePartitions(cacheIds, topVer, parts, reserved, node.id(), reqId)) { + String err = reservePartitions(cacheIds, topVer, parts, reserved, node.id(), reqId); + + if (!F.isEmpty(err)) { U.error(log, "Failed to reserve partitions for DML request. [localNodeId=" + ctx.localNodeId() + ", nodeId=" + node.id() + ", reqId=" + req.requestId() + ", cacheIds=" + cacheIds + ", topVer=" + topVer + ", parts=" + Arrays.toString(parts) + ']'); - sendUpdateResponse(node, reqId, null, "Failed to reserve partitions for DML request. " + - "Explanation (Retry your request when re-balancing is over)."); + sendUpdateResponse(node, reqId, null, + "Failed to reserve partitions for DML request. " + err); return; } @@ -1299,7 +1311,7 @@ public class GridMapQueryExecutor { * @param reqId Request ID. * @param segmentId Index segment ID. */ - private void sendRetry(ClusterNode node, long reqId, int segmentId) { + private void sendRetry(ClusterNode node, long reqId, int segmentId, String retryCause) { try { boolean loc = node.isLocal(); @@ -1310,6 +1322,7 @@ public class GridMapQueryExecutor { false); msg.retry(h2.readyTopologyVersion()); + msg.retryCause(retryCause); if (loc) h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 910ad1a..586fb51 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -65,7 +65,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLo import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxSelectForUpdateFuture; import org.apache.ignite.internal.processors.cache.distributed.near.TxTopologyVersionFuture; import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; -import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; @@ -234,8 +233,7 @@ public class GridReduceQueryExecutor { * @param nodeId Left node ID. */ private void handleNodeLeft(ReduceQueryRun r, UUID nodeId) { - // Will attempt to retry. If reduce query was started it will fail on next page fetching. - retry(r, h2.readyTopologyVersion(), nodeId); + r.setStateOnNodeLeave(nodeId, h2.readyTopologyVersion()); } /** @@ -287,12 +285,13 @@ public class GridReduceQueryExecutor { */ private void fail(ReduceQueryRun r, UUID nodeId, String msg, byte failCode) { if (r != null) { - CacheException e = new CacheException("Failed to execute map query on the node: " + nodeId + ", " + msg); + CacheException e = new CacheException("Failed to execute map query on remote node [nodeId=" + nodeId + + ", errMsg=" + msg + ']'); if (failCode == GridQueryFailResponse.CANCELLED_BY_ORIGINATOR) e.addSuppressed(new QueryCancelledException()); - r.state(e, nodeId); + r.setStateOnException(nodeId, e); } } @@ -300,7 +299,7 @@ public class GridReduceQueryExecutor { * @param node Node. * @param msg Message. */ - private void onNextPage(final ClusterNode node, GridQueryNextPageResponse msg) { + private void onNextPage(final ClusterNode node, final GridQueryNextPageResponse msg) { final long qryReqId = msg.queryRequestId(); final int qry = msg.query(); final int seg = msg.segmentId(); @@ -319,20 +318,13 @@ public class GridReduceQueryExecutor { try { page = new GridResultPage(ctx, node.id(), msg) { @Override public void fetchNextPage() { - Object errState = r.state(); + if (r.hasErrorOrRetry()) { + if (r.exception() != null) + throw r.exception(); - if (errState != null) { - CacheException err0 = errState instanceof CacheException ? (CacheException)errState : null; + assert r.retryCause() != null; - if (err0 != null && err0.getCause() instanceof IgniteClientDisconnectedException) - throw err0; - - CacheException e = new CacheException("Failed to fetch data from node: " + node.id()); - - if (err0 != null) - e.addSuppressed(err0); - - throw e; + throw new CacheException(r.retryCause()); } try { @@ -360,7 +352,7 @@ public class GridReduceQueryExecutor { idx.addPage(page); if (msg.retry() != null) - retry(r, msg.retry(), node.id()); + r.setStateOnRetry(node.id(), msg.retry(), msg.retryCause()); else if (msg.page() == 0) { // Do count down on each first page received. r.latch().countDown(); @@ -373,20 +365,14 @@ public class GridReduceQueryExecutor { } /** - * @param r Query run. - * @param retryVer Retry version. - * @param nodeId Node ID. - */ - private void retry(ReduceQueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) { - r.state(retryVer, nodeId); - } - - /** * @param cacheIds Cache IDs. * @return {@code true} If preloading is active. */ private boolean isPreloadingActive(List<Integer> cacheIds) { for (Integer cacheId : cacheIds) { + if (null == cacheContext(cacheId)) + throw new CacheException(String.format("Cache not found on local node [cacheId=%d]", cacheId)); + if (hasMovingPartitions(cacheContext(cacheId))) return true; } @@ -399,6 +385,8 @@ public class GridReduceQueryExecutor { * @return {@code True} If cache has partitions in {@link GridDhtPartitionState#MOVING} state. */ private boolean hasMovingPartitions(GridCacheContext<?, ?> cctx) { + assert cctx != null; + return !cctx.isLocal() && cctx.topology().hasMovingPartitions(); } @@ -593,9 +581,18 @@ public class GridReduceQueryExecutor { final long startTime = U.currentTimeMillis(); + ReduceQueryRun lastRun = null; + for (int attempt = 0;; attempt++) { - if (attempt > 0 && retryTimeout > 0 && (U.currentTimeMillis() - startTime > retryTimeout)) - throw new CacheException("Failed to map SQL query to topology."); + if (attempt > 0 && retryTimeout > 0 && (U.currentTimeMillis() - startTime > retryTimeout)) { + UUID retryNodeId = lastRun.retryNodeId(); + String retryCause = lastRun.retryCause(); + + assert !F.isEmpty(retryCause); + + throw new CacheException("Failed to map SQL query to topology on data node [dataNodeId=" + retryNodeId + + ", msg=" + retryCause + ']'); + } if (attempt != 0) { try { @@ -877,29 +874,26 @@ public class GridReduceQueryExecutor { if (send(nodes, req, spec, false)) { awaitAllReplies(r, nodes, cancel); - Object state = r.state(); - - if (state != null) { - if (state instanceof CacheException) { - CacheException err = (CacheException)state; + if (r.hasErrorOrRetry()) { + CacheException err = r.exception(); + if (err != null) { if (err.getCause() instanceof IgniteClientDisconnectedException) throw err; if (wasCancelled(err)) throw new QueryCancelledException(); // Throw correct exception. - throw new CacheException("Failed to run map query remotely." + err.getMessage(), err); + throw new CacheException("Failed to run map query remotely: " + err.getMessage(), err); } - - if (state instanceof AffinityTopologyVersion) { + else { retry = true; // On-the-fly topology change must not be possible in FOR UPDATE case. assert sfuFut == null; // If remote node asks us to retry then we have outdated full partition map. - h2.awaitForReadyTopologyVersion((AffinityTopologyVersion)state); + h2.awaitForReadyTopologyVersion(r.retryTopologyVersion()); } } } @@ -952,6 +946,9 @@ public class GridReduceQueryExecutor { } } else { + assert r != null; + lastRun=r; + if (Thread.currentThread().isInterrupted()) throw new IgniteInterruptedCheckedException("Query was interrupted."); http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java index 3c17640..0cb986b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java @@ -72,8 +72,6 @@ public class GridResultPage { Collection<?> plainRows = res.plainRows(); if (plainRows != null) { - assert plainRows instanceof ArrayList; - rowsInPage = plainRows.size(); if (rowsInPage == 0 || ((ArrayList<Value[]>)plainRows).get(0).length == res.columns()) http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java index df72e8c..7ddd653 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxSe import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; +import org.apache.ignite.internal.util.typedef.F; import org.h2.jdbc.JdbcConnection; import org.jetbrains.annotations.Nullable; @@ -53,8 +54,8 @@ class ReduceQueryRun { /** */ private final int pageSize; - /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */ - private final AtomicReference<Object> state = new AtomicReference<>(); + /** */ + private final AtomicReference<State> state = new AtomicReference<>(); /** Future controlling {@code SELECT FOR UPDATE} query execution. */ private final GridNearTxSelectForUpdateFuture selectForUpdateFut; @@ -86,30 +87,59 @@ class ReduceQueryRun { } /** - * @param o Fail state object. + * Set state on exception. + * + * @param err error. + * @param nodeId Node ID. + */ + void setStateOnException(@Nullable UUID nodeId, CacheException err) { + setState0(new State(nodeId, err, null, null)); + } + + /** + * Set state on map node leave. + * + * @param nodeId Node ID. + * @param topVer Topology version. + */ + void setStateOnNodeLeave(UUID nodeId, AffinityTopologyVersion topVer) { + setState0(new State(nodeId, null, topVer, "Data node has left the grid during query execution [nodeId=" + + nodeId + ']')); + } + + /** + * Set state on retry due to mapping failure. + * * @param nodeId Node ID. + * @param topVer Topology version. + * @param retryCause Retry cause. */ - void state(Object o, @Nullable UUID nodeId) { - assert o != null; - assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass(); + void setStateOnRetry(UUID nodeId, AffinityTopologyVersion topVer, String retryCause) { + assert !F.isEmpty(retryCause); - if (!state.compareAndSet(null, o)) + setState0(new State(nodeId, null, topVer, retryCause)); + } + + /** + * + * @param state state + */ + private void setState0(State state){ + if (!this.state.compareAndSet(null, state)) return; while (latch.getCount() != 0) // We don't need to wait for all nodes to reply. latch.countDown(); - CacheException e = o instanceof CacheException ? (CacheException) o : null; - for (GridMergeIndex idx : idxs) // Fail all merge indexes. - idx.fail(nodeId, e); + idx.fail(state.nodeId, state.ex); } /** * @param e Error. */ void disconnected(CacheException e) { - state(e, null); + setStateOnException(null, e); } /** @@ -133,11 +163,45 @@ class ReduceQueryRun { return conn; } + /** */ + boolean hasErrorOrRetry(){ + return state.get() != null; + } + /** - * @return State. + * @return Exception. */ - Object state() { - return state.get(); + CacheException exception() { + State st = state.get(); + + return st != null ? st.ex : null; + } + + /** + * @return Retry topology version. + */ + AffinityTopologyVersion retryTopologyVersion(){ + State st = state.get(); + + return st != null ? st.retryTopVer : null; + } + + /** + * @return Retry bode ID. + */ + UUID retryNodeId() { + State st = state.get(); + + return st != null ? st.nodeId : null; + } + + /** + * @return Retry cause. + */ + String retryCause(){ + State st = state.get(); + + return st != null ? st.retryCause : null; } /** @@ -167,4 +231,29 @@ class ReduceQueryRun { @Nullable public GridNearTxSelectForUpdateFuture selectForUpdateFuture() { return selectForUpdateFut; } + + /** + * Error state. + */ + private static class State { + /** Affected node (may be null in case of local node failure). */ + private final UUID nodeId; + + /** Error. */ + private final CacheException ex; + + /** Retry topology version. */ + private final AffinityTopologyVersion retryTopVer; + + /** Retry cause. */ + private final String retryCause; + + /** */ + private State(UUID nodeId, CacheException ex, AffinityTopologyVersion retryTopVer, String retryCause){ + this.nodeId = nodeId; + this.ex = ex; + this.retryTopVer = retryTopVer; + this.retryCause = retryCause; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheCauseRetryMessageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheCauseRetryMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheCauseRetryMessageSelfTest.java new file mode 100644 index 0000000..8c4358a --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheCauseRetryMessageSelfTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import javax.cache.CacheException; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT; +import static org.apache.ignite.internal.processors.query.h2.twostep.JoinSqlTestHelper.Organization; +import static org.apache.ignite.internal.processors.query.h2.twostep.JoinSqlTestHelper.Person; + +/** + * Failed to reserve partitions for query (cache is not found on local node) Root cause test + */ +public class DisappearedCacheCauseRetryMessageSelfTest extends GridCommonAbstractTest { + /** */ + private static final int NODES_COUNT = 2; + /** */ + private static final String ORG = "org"; + /** */ + private IgniteCache<String, JoinSqlTestHelper.Person> personCache; + /** */ + private IgniteCache<String, JoinSqlTestHelper.Organization> orgCache; + + /** */ + public void testDisappearedCacheCauseRetryMessage() { + + SqlQuery<String, JoinSqlTestHelper.Person> qry = new SqlQuery<String, JoinSqlTestHelper.Person>(JoinSqlTestHelper.Person.class, JoinSqlTestHelper.JOIN_SQL).setArgs("Organization #0"); + + qry.setDistributedJoins(true); + + try { + personCache.query(qry).getAll(); + + fail("No CacheException emitted."); + } + catch (CacheException e) { + assertTrue(e.getMessage(), e.getMessage().contains("Failed to reserve partitions for query (cache is not found on local node) [")); + } + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCommunicationSpi(new TcpCommunicationSpi(){ + + volatile long reqId = -1; + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) { + assert msg != null; + + if ( GridIoMessage.class.isAssignableFrom(msg.getClass())){ + GridIoMessage gridMsg = (GridIoMessage)msg; + + if ( GridH2QueryRequest.class.isAssignableFrom( gridMsg.message().getClass() ) ){ + GridH2QueryRequest req = (GridH2QueryRequest) (gridMsg.message()); + reqId = req.requestId(); + orgCache.destroy(); + } + else if ( GridQueryCancelRequest.class.isAssignableFrom( gridMsg.message().getClass() ) ){ + GridQueryCancelRequest req = (GridQueryCancelRequest) (gridMsg.message()); + + if (reqId == req.queryRequestId()) + orgCache = DisappearedCacheCauseRetryMessageSelfTest.this.ignite(0).getOrCreateCache(new CacheConfiguration<String, Organization>(ORG) + .setCacheMode(CacheMode.REPLICATED) + .setIndexedTypes(String.class, JoinSqlTestHelper.Organization.class) + ); + + } + } + + super.sendMessage(node, msg, ackC); + } + }); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + System.setProperty(IGNITE_SQL_RETRY_TIMEOUT, "5000"); + + startGridsMultiThreaded(NODES_COUNT, false); + + personCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, Person>("pers") + .setIndexedTypes(String.class, JoinSqlTestHelper.Person.class) + ); + + orgCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, Organization>(ORG) + .setCacheMode(CacheMode.REPLICATED) + .setIndexedTypes(String.class, JoinSqlTestHelper.Organization.class) + ); + + awaitPartitionMapExchange(); + + JoinSqlTestHelper.populateDataIntoOrg(orgCache); + + JoinSqlTestHelper.populateDataIntoPerson(personCache); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheWasNotFoundMessageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheWasNotFoundMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheWasNotFoundMessageSelfTest.java new file mode 100644 index 0000000..9928ed6 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheWasNotFoundMessageSelfTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT; +import static org.apache.ignite.internal.processors.query.h2.twostep.JoinSqlTestHelper.Organization; +import static org.apache.ignite.internal.processors.query.h2.twostep.JoinSqlTestHelper.Person; + +/** + * Grid cache context is not registered for cache id root cause message test + */ +public class DisappearedCacheWasNotFoundMessageSelfTest extends GridCommonAbstractTest { + /** */ + private static final int NODES_COUNT = 2; + /** */ + private static final String ORG = "org"; + /** */ + private IgniteCache<String, JoinSqlTestHelper.Person> personCache; + /** */ + private IgniteCache<String, JoinSqlTestHelper.Organization> orgCache; + + /** */ + public void testDisappearedCacheWasNotFoundMessage() { + SqlQuery<String, Person> qry = new SqlQuery<String, Person>(Person.class, JoinSqlTestHelper.JOIN_SQL).setArgs("Organization #0"); + + qry.setDistributedJoins(true); + + try { + personCache.query(qry).getAll(); + + fail("No CacheException emitted."); + } + catch (CacheException e) { + assertTrue(e.getMessage(), e.getMessage().contains("Cache not found on local node")); + } + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCommunicationSpi(new TcpCommunicationSpi(){ + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) { + assert msg != null; + + if ( GridIoMessage.class.isAssignableFrom(msg.getClass())){ + GridIoMessage gridMsg = (GridIoMessage)msg; + + if ( GridH2QueryRequest.class.isAssignableFrom( gridMsg.message().getClass() ) ){ + GridH2QueryRequest req = (GridH2QueryRequest) (gridMsg.message()); + + req.requestId(); + + orgCache.destroy(); + } + } + + super.sendMessage(node, msg, ackC); + } + }); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + System.setProperty(IGNITE_SQL_RETRY_TIMEOUT, "5000"); + + startGridsMultiThreaded(NODES_COUNT, false); + + personCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, Person>("pers") + .setIndexedTypes(String.class, JoinSqlTestHelper.Person.class) + ); + + orgCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, Organization>(ORG) + .setCacheMode(CacheMode.REPLICATED) + .setIndexedTypes(String.class, Organization.class) + ); + + awaitPartitionMapExchange(); + + JoinSqlTestHelper.populateDataIntoOrg(orgCache); + + JoinSqlTestHelper.populateDataIntoPerson(personCache); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinSqlTestHelper.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinSqlTestHelper.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinSqlTestHelper.java new file mode 100644 index 0000000..fe7821a --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinSqlTestHelper.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.annotations.QuerySqlField; + +/** + * Join sql test helper + */ +public class JoinSqlTestHelper { + /** */ + private static final int ORG_COUNT = 100; + + /** */ + private static final int PERSON_PER_ORG_COUNT = 10; + + /** */ + static final String JOIN_SQL = "select * from Person, \"org\".Organization as org " + + "where Person.orgId = org.id " + + "and lower(org.name) = lower(?)"; + + /** + * Populate organization cache with test data + * @param cache @{IgniteCache} + */ + static void populateDataIntoOrg(IgniteCache<String, Organization> cache) { + for (int i = 0; i < ORG_COUNT; i++) { + Organization org = new Organization(); + + org.setId("org" + i); + + org.setName("Organization #" + i); + + cache.put(org.getId(), org); + } + } + + /** + * Populate person cache with test data + * @param cache @{IgniteCache} + */ + static void populateDataIntoPerson(IgniteCache<String, Person> cache) { + int personId = 0; + + for (int i = 0; i < ORG_COUNT; i++) { + Organization org = new Organization(); + + org.setId("org" + i); + + org.setName("Organization #" + i); + + for (int j = 0; j < PERSON_PER_ORG_COUNT; j++) { + Person prsn = new Person(); + + prsn.setId("pers" + personId); + + prsn.setOrgId(org.getId()); + + prsn.setName("Person name #" + personId); + + cache.put(prsn.getId(), prsn); + + personId++; + } + } + } + + /** + * + */ + public static class Person { + /** */ + @QuerySqlField(index = true) + private String id; + + /** */ + @QuerySqlField(index = true) + private String orgId; + + /** */ + @QuerySqlField(index = true) + private String name; + + /** */ + public String getId() { + return id; + } + + /** */ + public void setId(String id) { + this.id = id; + } + + /** */ + public String getOrgId() { + return orgId; + } + + /** */ + public void setOrgId(String orgId) { + this.orgId = orgId; + } + + /** */ + public String getName() { + return name; + } + + /** */ + public void setName(String name) { + this.name = name; + } + } + + /** + * + */ + public static class Organization { + /** */ + @QuerySqlField(index = true) + private String id; + + /** */ + @QuerySqlField(index = true) + private String name; + + /** */ + public void setId(String id) { + this.id = id; + } + + /** */ + public String getId() { + return id; + } + + /** */ + public String getName() { + return name; + } + + /** */ + public void setName(String name) { + this.name = name; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java new file mode 100644 index 0000000..c602225 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import java.util.List; +import javax.cache.Cache; +import javax.cache.CacheException; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT; + +/** + * Failed to execute non-collocated query root cause message test + */ +public class NonCollocatedRetryMessageSelfTest extends GridCommonAbstractTest { + /** */ + private static final int NODES_COUNT = 3; + + /** */ + private static final String ORG = "org"; + + /** */ + private IgniteCache<String, JoinSqlTestHelper.Person> personCache; + + /** */ + public void testNonCollocatedRetryMessage() { + SqlQuery<String, JoinSqlTestHelper.Person> qry = new SqlQuery<String, JoinSqlTestHelper.Person>(JoinSqlTestHelper.Person.class, JoinSqlTestHelper.JOIN_SQL).setArgs("Organization #0"); + + qry.setDistributedJoins(true); + + try { + List<Cache.Entry<String,JoinSqlTestHelper.Person>> prsns = personCache.query(qry).getAll(); + fail("No CacheException emitted. Collection size="+prsns.size()); + } + catch (CacheException e) { + assertTrue(e.getMessage(), e.getMessage().contains("Failed to execute non-collocated query")); + } + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCommunicationSpi(new TcpCommunicationSpi(){ + volatile long reqId = -1; + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) { + assert msg != null; + + if ( GridIoMessage.class.isAssignableFrom(msg.getClass())){ + GridIoMessage gridMsg = (GridIoMessage)msg; + + if ( GridH2QueryRequest.class.isAssignableFrom( gridMsg.message().getClass() ) ){ + GridH2QueryRequest req = (GridH2QueryRequest) (gridMsg.message()); + + if (reqId < 0) { + reqId = req.requestId(); + + String shutName = getTestIgniteInstanceName(1); + + stopGrid(shutName, true, false); + } + else if( reqId != req.requestId() ){ + try { + U.sleep(IgniteSystemProperties.getLong(IGNITE_SQL_RETRY_TIMEOUT, GridReduceQueryExecutor.DFLT_RETRY_TIMEOUT)); + } + catch (IgniteInterruptedCheckedException e) { + // no-op + } + } + } + } + super.sendMessage(node, msg, ackC); + } + }); + + cfg.setDiscoverySpi(new TcpDiscoverySpi(){ + public long getNodesJoined() { + return stats.joinedNodesCount(); + } + }); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + System.setProperty(IGNITE_SQL_RETRY_TIMEOUT, "5000"); + + startGridsMultiThreaded(NODES_COUNT, false); + + personCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, JoinSqlTestHelper.Person>("pers") + .setBackups(1) + .setIndexedTypes(String.class, JoinSqlTestHelper.Person.class) + ); + + final IgniteCache<String, JoinSqlTestHelper.Organization> orgCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, JoinSqlTestHelper.Organization>(ORG) + .setBackups(1) + .setIndexedTypes(String.class, JoinSqlTestHelper.Organization.class) + ); + + awaitPartitionMapExchange(); + + JoinSqlTestHelper.populateDataIntoOrg(orgCache); + + JoinSqlTestHelper.populateDataIntoPerson(personCache); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java new file mode 100644 index 0000000..3269887 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; +import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; +import static org.apache.ignite.internal.processors.query.h2.twostep.JoinSqlTestHelper.JOIN_SQL; +import static org.apache.ignite.internal.processors.query.h2.twostep.JoinSqlTestHelper.Organization; +import static org.apache.ignite.internal.processors.query.h2.twostep.JoinSqlTestHelper.Person; + +/** + * Test for 6 retry cases + */ +public class RetryCauseMessageSelfTest extends GridCommonAbstractTest { + /** */ + private static final int NODES_COUNT = 2; + + /** */ + private static final String ORG_SQL = "select * from Organization"; + + /** */ + private static final String ORG = "org"; + + /** */ + private IgniteCache<String, Person> personCache; + + /** */ + private IgniteCache<String, Organization> orgCache; + + /** */ + private IgniteH2Indexing h2Idx; + + /** */ + @Override protected long getTestTimeout() { + return 600 * 1000; + } + + /** + * Failed to reserve partitions for query (cache is not found on local node) + */ + public void testSynthCacheWasNotFoundMessage() { + GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec"); + + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", + new MockGridMapQueryExecutor(null) { + @Override public void onMessage(UUID nodeId, Object msg) { + if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) { + GridH2QueryRequest qryReq = (GridH2QueryRequest)msg; + + qryReq.caches().add(Integer.MAX_VALUE); + + startedExecutor.onMessage(nodeId, msg); + + qryReq.caches().remove(qryReq.caches().size() - 1); + } + else + startedExecutor.onMessage(nodeId, msg); + } + }.insertRealExecutor(mapQryExec)); + + SqlQuery<String, Person> qry = new SqlQuery<String, Person>(Person.class, JOIN_SQL).setArgs("Organization #0"); + + qry.setDistributedJoins(true); + + try { + personCache.query(qry).getAll(); + } + catch (CacheException e) { + assertTrue(e.getMessage(), e.getMessage().contains("Failed to reserve partitions for query (cache is not found on local node) [")); + + return; + } + finally { + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", mapQryExec); + } + fail(); + } + + /** + * Failed to reserve partitions for query (group reservation failed) + */ + public void testGrpReservationFailureMessage() { + final GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec"); + + final ConcurrentMap<MapReservationKey, GridReservable> reservations = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "reservations"); + + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", + new MockGridMapQueryExecutor(null) { + @Override public void onMessage(UUID nodeId, Object msg) { + if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) { + final MapReservationKey grpKey = new MapReservationKey(ORG, null); + + reservations.put(grpKey, new GridReservable() { + + @Override public boolean reserve() { + return false; + } + + @Override public void release() {} + }); + } + startedExecutor.onMessage(nodeId, msg); + } + }.insertRealExecutor(mapQryExec)); + + SqlQuery<String, Person> qry = new SqlQuery<String, Person>(Person.class, JOIN_SQL).setArgs("Organization #0"); + + qry.setDistributedJoins(true); + + try { + personCache.query(qry).getAll(); + } + catch (CacheException e) { + assertTrue(e.getMessage().contains("Failed to reserve partitions for query (group reservation failed) [")); + + return; + } + finally { + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", mapQryExec); + } + fail(); + } + + /** + * Failed to reserve partitions for query (partition of REPLICATED cache is not in OWNING state) + */ + public void testReplicatedCacheReserveFailureMessage() { + GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec"); + + final GridKernalContext ctx = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "ctx"); + + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", + new MockGridMapQueryExecutor(null) { + @Override public void onMessage(UUID nodeId, Object msg) { + if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) { + GridH2QueryRequest qryReq = (GridH2QueryRequest)msg; + + GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(qryReq.caches().get(0)); + + GridDhtLocalPartition part = cctx.topology().localPartition(0, NONE, false); + + AtomicLong aState = GridTestUtils.getFieldValue(part, GridDhtLocalPartition.class, "state"); + + long stateVal = aState.getAndSet(2); + + startedExecutor.onMessage(nodeId, msg); + + aState.getAndSet(stateVal); + } + else + startedExecutor.onMessage(nodeId, msg); + } + }.insertRealExecutor(mapQryExec)); + + SqlQuery<String, Organization> qry = new SqlQuery<>(Organization.class, ORG_SQL); + + qry.setDistributedJoins(true); + try { + orgCache.query(qry).getAll(); + } + catch (CacheException e) { + assertTrue(e.getMessage().contains("Failed to reserve partitions for query (partition of REPLICATED cache is not in OWNING state) [")); + + return; + } + finally { + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", mapQryExec); + } + fail(); + } + + /** + * Failed to reserve partitions for query (partition of PARTITIONED cache cannot be reserved) + */ + public void testPartitionedCacheReserveFailureMessage() { + GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec"); + + final GridKernalContext ctx = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "ctx"); + + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", + new MockGridMapQueryExecutor(null) { + @Override public void onMessage(UUID nodeId, Object msg) { + if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) { + GridH2QueryRequest qryReq = (GridH2QueryRequest)msg; + + GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(qryReq.caches().get(0)); + + GridDhtLocalPartition part = cctx.topology().localPartition(0, NONE, false); + + AtomicLong aState = GridTestUtils.getFieldValue(part, GridDhtLocalPartition.class, "state"); + + long stateVal = aState.getAndSet(2); + + startedExecutor.onMessage(nodeId, msg); + + aState.getAndSet(stateVal); + } + else + startedExecutor.onMessage(nodeId, msg); + + } + }.insertRealExecutor(mapQryExec)); + + SqlQuery<String, Person> qry = new SqlQuery<String, Person>(Person.class, JOIN_SQL).setArgs("Organization #0"); + + qry.setDistributedJoins(true); + try { + personCache.query(qry).getAll(); + } + catch (CacheException e) { + assertTrue(e.getMessage().contains("Failed to reserve partitions for query (partition of PARTITIONED cache cannot be reserved) [")); + + return; + } + finally { + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", mapQryExec); + } + fail(); + } + + /** + * Failed to execute non-collocated query (will retry) + */ + public void testNonCollocatedFailureMessage() { + final GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec"); + + final ConcurrentMap<MapReservationKey, GridReservable> reservations = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "reservations"); + + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", + new MockGridMapQueryExecutor(null) { + @Override public void onMessage(UUID nodeId, Object msg) { + if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) { + final MapReservationKey grpKey = new MapReservationKey(ORG, null); + + reservations.put(grpKey, new GridReservable() { + + @Override public boolean reserve() { + throw new GridH2RetryException("test retry exception"); + } + + @Override public void release() { + } + }); + } + startedExecutor.onMessage(nodeId, msg); + + } + }.insertRealExecutor(mapQryExec)); + + SqlQuery<String, Person> qry = new SqlQuery<String, Person>(Person.class, JOIN_SQL).setArgs("Organization #0"); + + qry.setDistributedJoins(true); + try { + personCache.query(qry).getAll(); + } + catch (CacheException e) { + assertTrue(e.getMessage().contains("Failed to execute non-collocated query (will retry) [")); + + return; + } + finally { + GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", mapQryExec); + } + fail(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCommunicationSpi(new TcpCommunicationSpi(){ + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) { + assert msg != null; + + super.sendMessage(node, msg, ackC); + } + }); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + System.setProperty(IGNITE_SQL_RETRY_TIMEOUT, "5000"); + + Ignite ignite = startGridsMultiThreaded(NODES_COUNT, false); + + GridQueryProcessor qryProc = grid(ignite.name()).context().query(); + + h2Idx = GridTestUtils.getFieldValue(qryProc, GridQueryProcessor.class, "idx"); + + personCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, Person>("pers") + .setIndexedTypes(String.class, Person.class) + ); + + orgCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, Organization>(ORG) + .setCacheMode(CacheMode.REPLICATED) + .setIndexedTypes(String.class, Organization.class) + ); + + awaitPartitionMapExchange(); + + JoinSqlTestHelper.populateDataIntoOrg(orgCache); + + JoinSqlTestHelper.populateDataIntoPerson(personCache); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + + /** + * Wrapper around @{GridMapQueryExecutor} + */ + private abstract static class MockGridMapQueryExecutor extends GridMapQueryExecutor { + + /** + * Wrapped executor + */ + GridMapQueryExecutor startedExecutor; + + /** */ + MockGridMapQueryExecutor insertRealExecutor(GridMapQueryExecutor realExecutor) { + this.startedExecutor = realExecutor; + return this; + } + + /** + * @param busyLock Busy lock. + */ + MockGridMapQueryExecutor(GridSpinBusyLock busyLock) { + super(busyLock); + } + + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg) { + startedExecutor.onMessage(nodeId, msg); + } + + /** {@inheritDoc} */ + @Override public void cancelLazyWorkers() { + startedExecutor.cancelLazyWorkers(); + } + + /** {@inheritDoc} */ + @Override GridSpinBusyLock busyLock() { + return startedExecutor.busyLock(); + } + + /** {@inheritDoc} */ + @Override public void onCacheStop(String cacheName) { + startedExecutor.onCacheStop(cacheName); + } + + /** {@inheritDoc} */ + @Override public void stopAndUnregisterCurrentLazyWorker() { + startedExecutor.stopAndUnregisterCurrentLazyWorker(); + } + + /** {@inheritDoc} */ + @Override public void unregisterLazyWorker(MapQueryLazyWorker worker) { + startedExecutor.unregisterLazyWorker(worker); + } + + /** {@inheritDoc} */ + @Override public int registeredLazyWorkers() { + return startedExecutor.registeredLazyWorkers(); + } + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java index 093423d..536834c 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java @@ -51,6 +51,10 @@ import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlDistribut import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlSegmentedIndexMultiNodeSelfTest; import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlSegmentedIndexSelfTest; import org.apache.ignite.internal.processors.query.h2.twostep.CacheQueryMemoryLeakTest; +import org.apache.ignite.internal.processors.query.h2.twostep.DisappearedCacheCauseRetryMessageSelfTest; +import org.apache.ignite.internal.processors.query.h2.twostep.DisappearedCacheWasNotFoundMessageSelfTest; +import org.apache.ignite.internal.processors.query.h2.twostep.NonCollocatedRetryMessageSelfTest; +import org.apache.ignite.internal.processors.query.h2.twostep.RetryCauseMessageSelfTest; import org.apache.ignite.testframework.IgniteTestSuite; /** @@ -110,6 +114,11 @@ public class IgniteCacheQuerySelfTestSuite2 extends TestSuite { suite.addTestSuite(CacheQueryMemoryLeakTest.class); + suite.addTestSuite(NonCollocatedRetryMessageSelfTest.class); + suite.addTestSuite(RetryCauseMessageSelfTest.class); + suite.addTestSuite(DisappearedCacheCauseRetryMessageSelfTest.class); + suite.addTestSuite(DisappearedCacheWasNotFoundMessageSelfTest.class); + return suite; } }
