ignite-sql-tests - marshalling fix + logging
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/37877ee6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/37877ee6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/37877ee6 Branch: refs/heads/ignite-sql-tests Commit: 37877ee6e93ac4226bd0b537a479b0105e99064b Parents: c89ed06 Author: S.Vladykin <[email protected]> Authored: Mon Feb 9 18:14:17 2015 +0300 Committer: S.Vladykin <[email protected]> Committed: Mon Feb 9 18:14:17 2015 +0300 ---------------------------------------------------------------------- .../query/h2/twostep/GridMapQueryExecutor.java | 34 +++++++++++----- .../h2/twostep/GridReduceQueryExecutor.java | 31 ++++++++++----- .../twostep/messages/GridNextPageRequest.java | 7 ++++ .../twostep/messages/GridNextPageResponse.java | 5 ++- .../query/h2/twostep/messages/GridQueryAck.java | 42 -------------------- .../twostep/messages/GridQueryFailResponse.java | 7 ++++ .../h2/twostep/messages/GridQueryRequest.java | 8 ++++ 7 files changed, 72 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37877ee6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index f322de7..ac5761b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.h2.*; import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.h2.jdbc.*; import org.h2.result.*; @@ -83,14 +84,26 @@ public class GridMapQueryExecutor { ctx.io().addUserMessageListener(GridTopic.TOPIC_QUERY, new IgniteBiPredicate<UUID, Object>() { @Override public boolean apply(UUID nodeId, Object msg) { - assert msg != null; + try { + assert msg != null; - ClusterNode node = ctx.discovery().node(nodeId); + ClusterNode node = ctx.discovery().node(nodeId); - if (msg instanceof GridQueryRequest) - executeLocalQuery(node, (GridQueryRequest)msg); - else if (msg instanceof GridNextPageRequest) - sendNextPage(node, (GridNextPageRequest)msg); + boolean processed = true; + + if (msg instanceof GridQueryRequest) + executeLocalQuery(node, (GridQueryRequest)msg); + else if (msg instanceof GridNextPageRequest) + sendNextPage(node, (GridNextPageRequest)msg); + else + processed = false; + + if (processed && log.isDebugEnabled()) + log.debug("Processed request: " + nodeId + "->" + ctx.localNodeId() + " " + msg); + } + catch(Throwable th) { + U.error(log, "Failed to process message: " + msg, th); + } return true; } @@ -148,6 +161,8 @@ public class GridMapQueryExecutor { } } catch (Throwable e) { + U.error(log, "Failed to execute local query: " + req, e); + sendError(node, req.requestId(), e); } finally { @@ -162,12 +177,13 @@ public class GridMapQueryExecutor { */ private void sendError(ClusterNode node, long qryReqId, Throwable err) { try { - ctx.io().sendUserMessage(F.asList(node), new GridQueryFailResponse(qryReqId, err)); + ctx.io().sendUserMessage(F.asList(node), new GridQueryFailResponse(qryReqId, err), + GridTopic.TOPIC_QUERY, false, 0); } - catch (IgniteCheckedException e) { + catch (Exception e) { e.addSuppressed(err); - log.error("Failed to send error message.", e); + U.error(log, "Failed to send error message.", e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37877ee6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 2149aef..608f50f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -25,8 +25,6 @@ import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.h2.*; import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; -import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; -import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -71,14 +69,26 @@ public class GridReduceQueryExecutor { ctx.io().addUserMessageListener(GridTopic.TOPIC_QUERY, new IgniteBiPredicate<UUID, Object>() { @Override public boolean apply(UUID nodeId, Object msg) { - assert msg != null; + try { + assert msg != null; + + ClusterNode node = ctx.discovery().node(nodeId); - ClusterNode node = ctx.discovery().node(nodeId); + boolean processed = true; - if (msg instanceof GridNextPageResponse) - onNextPage(node, (GridNextPageResponse)msg); - else if (msg instanceof GridQueryFailResponse) - onFail(node, (GridQueryFailResponse)msg); + if (msg instanceof GridNextPageResponse) + onNextPage(node, (GridNextPageResponse)msg); + else if (msg instanceof GridQueryFailResponse) + onFail(node, (GridQueryFailResponse)msg); + else + processed = false; + + if (processed && log.isDebugEnabled()) + log.debug("Processed response: " + nodeId + "->" + ctx.localNodeId() + " " + msg); + } + catch(Throwable th) { + U.error(log, "Failed to process message: " + msg, th); + } return true; } @@ -111,7 +121,8 @@ public class GridReduceQueryExecutor { idx.addPage(new GridResultPage<UUID>(node.id(), msg) { @Override public void fetchNextPage() { try { - ctx.io().sendUserMessage(F.asList(node), new GridNextPageRequest(qryReqId, qry, pageSize)); + ctx.io().sendUserMessage(F.asList(node), new GridNextPageRequest(qryReqId, qry, pageSize), + GridTopic.TOPIC_QUERY, false, 0); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -153,7 +164,7 @@ public class GridReduceQueryExecutor { r.latch = new CountDownLatch(r.tbls.size() * nodes.size()); - this.runs.put(qryReqId, r); + runs.put(qryReqId, r); try { ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 1000, qry.mapQueries()), // TODO conf page size http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37877ee6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridNextPageRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridNextPageRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridNextPageRequest.java index 8fa9ed0..c7bb582 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridNextPageRequest.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridNextPageRequest.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.processors.query.h2.twostep.messages; +import org.apache.ignite.internal.util.typedef.internal.*; + import java.io.*; /** @@ -64,4 +66,9 @@ public class GridNextPageRequest implements Serializable { public int pageSize() { return pageSize; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNextPageRequest.class, this); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37877ee6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridNextPageResponse.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridNextPageResponse.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridNextPageResponse.java index ad1dc84..ca2c803 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridNextPageResponse.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridNextPageResponse.java @@ -138,8 +138,11 @@ public class GridNextPageResponse implements Externalizable { first = false; } - for (Value val : row) + for (Value val : row) { + data.checkCapacity(data.getValueLen(val)); + data.writeValue(val); + } } out.writeInt(data.length()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37877ee6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryAck.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryAck.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryAck.java deleted file mode 100644 index 2399eec..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryAck.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2.twostep.messages; - -import java.io.*; - -/** - * TODO write doc - */ -public class GridQueryAck implements Serializable { - /** */ - private long reqId; - - /** - * @param reqId Request ID. - */ - public GridQueryAck(long reqId) { - this.reqId = reqId; - } - - /** - * @return Request ID. - */ - public long requestId() { - return reqId; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37877ee6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java index 42689ae..c05c36c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.query.h2.twostep.messages; +import org.apache.ignite.internal.util.typedef.internal.*; + import java.io.*; /** @@ -51,4 +53,9 @@ public class GridQueryFailResponse implements Serializable { public Throwable error() { return err; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridQueryFailResponse.class, this); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/37877ee6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java index 800aa91..f3d632d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.processors.query.h2.twostep.messages; import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; import java.util.*; @@ -33,6 +35,7 @@ public class GridQueryRequest implements Serializable { private int pageSize; /** */ + @GridToStringInclude private Collection<GridCacheSqlQuery> qrys; /** @@ -66,4 +69,9 @@ public class GridQueryRequest implements Serializable { public Collection<GridCacheSqlQuery> queries() { return qrys; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridQueryRequest.class, this); + } }
