http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 6a079f0..04449ac 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -31,6 +31,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; @@ -59,11 +60,13 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; import org.apache.ignite.internal.processors.query.h2.GridH2ResultSetIterator; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; @@ -71,11 +74,14 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryRequest; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.typedef.CIX2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiClosure; import org.apache.ignite.lang.IgniteFuture; -import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.plugin.extensions.communication.Message; import org.h2.command.ddl.CreateTableData; import org.h2.engine.Session; @@ -92,6 +98,7 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE; /** * Reduce query executor. @@ -101,6 +108,9 @@ public class GridReduceQueryExecutor { public static final byte QUERY_POOL = GridIoPolicy.SYSTEM_POOL; /** */ + private static final IgniteProductVersion DISTRIBUTED_JOIN_SINCE = IgniteProductVersion.fromString("1.7.0"); + + /** */ private GridKernalContext ctx; /** */ @@ -149,6 +159,13 @@ public class GridReduceQueryExecutor { /** */ private final GridSpinBusyLock busyLock; + /** */ + private final CIX2<ClusterNode,Message> locNodeHnd = new CIX2<ClusterNode,Message>() { + @Override public void applyx(ClusterNode locNode, Message msg) { + h2.mapQueryExecutor().onMessage(locNode.id(), msg); + } + }; + /** * @param busyLock Busy lock. */ @@ -173,6 +190,9 @@ public class GridReduceQueryExecutor { return; try { + if (msg instanceof GridCacheQueryMarshallable) + ((GridCacheQueryMarshallable)msg).unmarshall(ctx.config().getMarshaller(), ctx); + GridReduceQueryExecutor.this.onMessage(nodeId, msg); } finally { @@ -339,13 +359,13 @@ public class GridReduceQueryExecutor { * @param extraSpaces Extra spaces. * @return {@code true} If preloading is active. */ - private boolean isPreloadingActive(final GridCacheContext<?,?> cctx, List<String> extraSpaces) { + private boolean isPreloadingActive(final GridCacheContext<?, ?> cctx, List<Integer> extraSpaces) { if (hasMovingPartitions(cctx)) return true; if (extraSpaces != null) { - for (String extraSpace : extraSpaces) { - if (hasMovingPartitions(cacheContext(extraSpace))) + for (int i = 0; i < extraSpaces.size(); i++) { + if (hasMovingPartitions(cacheContext(extraSpaces.get(i)))) return true; } } @@ -357,7 +377,7 @@ public class GridReduceQueryExecutor { * @param cctx Cache context. * @return {@code true} If cache context */ - private boolean hasMovingPartitions(GridCacheContext<?,?> cctx) { + private boolean hasMovingPartitions(GridCacheContext<?, ?> cctx) { GridDhtPartitionFullMap fullMap = cctx.topology().partitionMap(false); for (GridDhtPartitionMap2 map : fullMap.values()) { @@ -369,34 +389,34 @@ public class GridReduceQueryExecutor { } /** - * @param name Cache name. + * @param cacheId Cache ID. * @return Cache context. */ - private GridCacheContext<?,?> cacheContext(String name) { - return ctx.cache().internalCache(name).context(); + private GridCacheContext<?,?> cacheContext(Integer cacheId) { + return ctx.cache().context().cacheContext(cacheId); } /** * @param topVer Topology version. * @param cctx Cache context for main space. * @param extraSpaces Extra spaces. - * @return Data nodes or {@code null} if repartitioning started and we need to retry.. + * @return Data nodes or {@code null} if repartitioning started and we need to retry. */ private Collection<ClusterNode> stableDataNodes( AffinityTopologyVersion topVer, - final GridCacheContext<?,?> cctx, - List<String> extraSpaces + final GridCacheContext<?, ?> cctx, + List<Integer> extraSpaces ) { - String space = cctx.name(); - Set<ClusterNode> nodes = new HashSet<>(cctx.affinity().assignment(topVer).primaryPartitionNodes()); if (F.isEmpty(nodes)) - throw new CacheException("Failed to find data nodes for cache: " + space); + throw new CacheException("Failed to find data nodes for cache: " + cctx.name()); if (!F.isEmpty(extraSpaces)) { - for (String extraSpace : extraSpaces) { - GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); + for (int i = 0; i < extraSpaces.size(); i++) { + GridCacheContext<?,?> extraCctx = cacheContext(extraSpaces.get(i)); + + String extraSpace = extraCctx.name(); if (extraCctx.isLocal()) continue; // No consistency guaranties for local caches. @@ -448,10 +468,16 @@ public class GridReduceQueryExecutor { /** * @param cctx Cache context. * @param qry Query. - * @param keepBinary Keep binary. + * @param keepPortable Keep portable. + * @param enforceJoinOrder Enforce join order of tables. * @return Cursor. */ - public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepBinary) { + public Iterator<List<?>> query( + GridCacheContext<?, ?> cctx, + GridCacheTwoStepQuery qry, + boolean keepPortable, + boolean enforceJoinOrder + ) { for (int attempt = 0;; attempt++) { if (attempt != 0) { try { @@ -464,21 +490,15 @@ public class GridReduceQueryExecutor { } } - long qryReqId = reqIdGen.incrementAndGet(); + final long qryReqId = reqIdGen.incrementAndGet(); - QueryRun r = new QueryRun(); + final String space = cctx.name(); - r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize(); - - r.idxs = new ArrayList<>(qry.mapQueries().size()); - - String space = cctx.name(); - - r.conn = (JdbcConnection)h2.connectionForSpace(space); + final QueryRun r = new QueryRun(h2.connectionForSpace(space), qry.mapQueries().size(), qry.pageSize()); AffinityTopologyVersion topVer = h2.readyTopologyVersion(); - List<String> extraSpaces = extraSpaces(space, qry.spaces()); + List<Integer> extraSpaces = qry.extraCaches(); Collection<ClusterNode> nodes; @@ -529,13 +549,12 @@ public class GridReduceQueryExecutor { idx = tbl.getScanIndex(null); - fakeTable(r.conn, tblIdx++).setInnerTable(tbl); + fakeTable(r.conn, tblIdx++).innerTable(tbl); } else idx = GridMergeIndexUnsorted.createDummy(ctx); - for (ClusterNode node : nodes) - idx.addSource(node.id()); + idx.setSources(nodes); r.idxs.add(idx); } @@ -548,10 +567,10 @@ public class GridReduceQueryExecutor { if (ctx.clientDisconnected()) { throw new CacheException("Query was cancelled, client node disconnected.", new IgniteClientDisconnectedException(ctx.cluster().clientReconnectFuture(), - "Client node disconnected.")); + "Client node disconnected.")); } - Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries(); + List<GridCacheSqlQuery> mapQrys = qry.mapQueries(); if (qry.explain()) { mapQrys = new ArrayList<>(qry.mapQueries().size()); @@ -560,17 +579,37 @@ public class GridReduceQueryExecutor { mapQrys.add(new GridCacheSqlQuery("EXPLAIN " + mapQry.query(), mapQry.parameters())); } - if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes. - Marshaller m = ctx.config().getMarshaller(); + boolean retry = false; - for (GridCacheSqlQuery mapQry : mapQrys) - mapQry.marshallParams(m); - } + IgniteProductVersion minNodeVer = cctx.shared().exchange().minimumNodeVersion(topVer); - boolean retry = false; + final boolean oldStyle = minNodeVer.compareToIgnoreTimestamp(DISTRIBUTED_JOIN_SINCE) < 0; + final boolean distributedJoins = qry.distributedJoins(); + + if (oldStyle && distributedJoins) + throw new CacheException("Failed to enable distributed joins. Topology contains older data nodes."); if (send(nodes, - new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) { + oldStyle ? + new GridQueryRequest(qryReqId, + r.pageSize, + space, + mapQrys, + topVer, + extraSpaces(space, qry.spaces()), + null) : + new GridH2QueryRequest() + .requestId(qryReqId) + .topologyVersion(topVer) + .pageSize(r.pageSize) + .caches(qry.caches()) + .tables(distributedJoins ? qry.tables() : null) + .partitions(convert(partsMap)) + .queries(mapQrys) + .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0), + oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null, + distributedJoins) + ) { awaitAllReplies(r, nodes); Object state = r.state.get(); @@ -599,9 +638,6 @@ public class GridReduceQueryExecutor { Iterator<List<?>> resIter = null; if (!retry) { - if (qry.explain()) - return explainPlan(r.conn, space, qry); - if (skipMergeTbl) { List<List<?>> res = new ArrayList<>(); @@ -616,7 +652,7 @@ public class GridReduceQueryExecutor { int cols = row.getColumnCount(); - List<Object> resRow = new ArrayList<>(cols); + List<Object> resRow = new ArrayList<>(cols); for (int c = 0; c < cols; c++) resRow.add(row.getValue(c).getObject()); @@ -627,32 +663,52 @@ public class GridReduceQueryExecutor { resIter = res.iterator(); } else { - GridCacheSqlQuery rdc = qry.reduceQuery(); + UUID locNodeId = ctx.localNodeId(); - // Statement caching is prohibited here because we can't guarantee correct merge index reuse. - ResultSet res = h2.executeSqlQueryWithTimer(space, - r.conn, - rdc.query(), - F.asList(rdc.parameters()), - false); + h2.setupConnection(r.conn, false, enforceJoinOrder); - resIter = new Iter(res); - } - } + GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE) + .pageSize(r.pageSize).distributedJoins(false)); - for (GridMergeIndex idx : r.idxs) { - if (!idx.fetchedAll()) // We have to explicitly cancel queries on remote nodes. - send(nodes, new GridQueryCancelRequest(qryReqId), null); + try { + if (qry.explain()) + return explainPlan(r.conn, space, qry); + + GridCacheSqlQuery rdc = qry.reduceQuery(); + + ResultSet res = h2.executeSqlQueryWithTimer(space, + r.conn, + rdc.query(), + F.asList(rdc.parameters()), + false); + + resIter = new Iter(res); + } + finally { + GridH2QueryContext.clearThreadLocal(); + } + } } if (retry) { + send(nodes, new GridQueryCancelRequest(qryReqId), null, false); + if (Thread.currentThread().isInterrupted()) throw new IgniteInterruptedCheckedException("Query was interrupted."); continue; } - return new GridQueryCacheObjectsIterator(resIter, cctx, keepBinary); + final Collection<ClusterNode> finalNodes = nodes; + + return new GridQueryCacheObjectsIterator(resIter, cctx, keepPortable) { + @Override public void close() throws Exception { + super.close(); + + if (distributedJoins || !allIndexesFetched(r.idxs)) + send(finalNodes, new GridQueryCancelRequest(qryReqId), null, false); + } + }; } catch (IgniteCheckedException | RuntimeException e) { U.closeQuiet(r.conn); @@ -678,13 +734,26 @@ public class GridReduceQueryExecutor { if (!skipMergeTbl) { for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) - fakeTable(null, i).setInnerTable(null); // Drop all merge tables. + fakeTable(null, i).innerTable(null); // Drop all merge tables. } } } } /** + * @param idxs Merge indexes. + * @return {@code true} If all remote data was fetched. + */ + private static boolean allIndexesFetched(List<GridMergeIndex> idxs) { + for (int i = 0; i < idxs.size(); i++) { + if (!idxs.get(i).fetchedAll()) + return false; + } + + return true; + } + + /** * @param r Query run. * @param nodes Nodes to check periodically if they alive. * @throws IgniteInterruptedCheckedException If interrupted. @@ -715,6 +784,7 @@ public class GridReduceQueryExecutor { /** * Gets or creates new fake table for index. * + * @param c Connection. * @param idx Index of table. * @return Table. */ @@ -759,8 +829,8 @@ public class GridReduceQueryExecutor { * @param extraSpaces Extra spaces. * @return Collection of all data nodes owning all the caches or {@code null} for retry. */ - private Collection<ClusterNode> replicatedUnstableDataNodes(final GridCacheContext<?,?> cctx, - List<String> extraSpaces) { + private Collection<ClusterNode> replicatedUnstableDataNodes(final GridCacheContext<?, ?> cctx, + List<Integer> extraSpaces) { assert cctx.isReplicated() : cctx.name() + " must be replicated"; Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx); @@ -769,15 +839,15 @@ public class GridReduceQueryExecutor { return null; // Retry. if (!F.isEmpty(extraSpaces)) { - for (String extraSpace : extraSpaces) { - GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); + for (int i = 0; i < extraSpaces.size(); i++) { + GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i)); if (extraCctx.isLocal()) continue; if (!extraCctx.isReplicated()) throw new CacheException("Queries running on replicated cache should not contain JOINs " + - "with tables in partitioned caches [rCache=" + cctx.name() + ", pCache=" + extraSpace + "]"); + "with tables in partitioned caches [rCache=" + cctx.name() + ", pCache=" + extraCctx.name() + "]"); Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx); @@ -846,14 +916,14 @@ public class GridReduceQueryExecutor { */ @SuppressWarnings("unchecked") private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(final GridCacheContext<?,?> cctx, - List<String> extraSpaces) { + List<Integer> extraSpaces) { assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned"; final int partsCnt = cctx.affinity().partitions(); if (extraSpaces != null) { // Check correct number of partitions for partitioned caches. - for (String extraSpace : extraSpaces) { - GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); + for (int i = 0; i < extraSpaces.size(); i++) { + GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i)); if (extraCctx.isReplicated() || extraCctx.isLocal()) continue; @@ -862,7 +932,7 @@ public class GridReduceQueryExecutor { if (parts != partsCnt) throw new CacheException("Number of partitions must be the same for correct collocation [cache1=" + - cctx.name() + ", parts1=" + partsCnt + ", cache2=" + extraSpace + ", parts2=" + parts + "]"); + cctx.name() + ", parts1=" + partsCnt + ", cache2=" + extraCctx.name() + ", parts2=" + parts + "]"); } } @@ -885,8 +955,8 @@ public class GridReduceQueryExecutor { if (extraSpaces != null) { // Find owner intersections for each participating partitioned cache partition. // We need this for logical collocation between different partitioned caches with the same affinity. - for (String extraSpace : extraSpaces) { - GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); + for (int i = 0; i < extraSpaces.size(); i++) { + GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i)); if (extraCctx.isReplicated() || extraCctx.isLocal()) continue; @@ -895,10 +965,10 @@ public class GridReduceQueryExecutor { List<ClusterNode> owners = extraCctx.topology().owners(p); if (F.isEmpty(owners)) { - if (!F.isEmpty(dataNodes(extraSpace, NONE))) + if (!F.isEmpty(dataNodes(extraCctx.name(), NONE))) return null; // Retry. - throw new CacheException("Failed to find data nodes [cache=" + extraSpace + ", part=" + p + "]"); + throw new CacheException("Failed to find data nodes [cache=" + extraCctx.name() + ", part=" + p + "]"); } if (partLocs[p] == null) @@ -913,8 +983,8 @@ public class GridReduceQueryExecutor { } // Filter nodes where not all the replicated caches loaded. - for (String extraSpace : extraSpaces) { - GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); + for (int i = 0; i < extraSpaces.size(); i++) { + GridCacheContext<?,?> extraCctx = cacheContext(extraSpaces.get(i)); if (!extraCctx.isReplicated()) continue; @@ -960,7 +1030,7 @@ public class GridReduceQueryExecutor { * @param allSpaces All spaces. * @return List of all extra spaces or {@code null} if none. */ - private List<String> extraSpaces(String mainSpace, Set<String> allSpaces) { + private List<String> extraSpaces(String mainSpace, Collection<String> allSpaces) { if (F.isEmpty(allSpaces) || (allSpaces.size() == 1 && allSpaces.contains(mainSpace))) return null; @@ -996,7 +1066,7 @@ public class GridReduceQueryExecutor { for (GridCacheSqlQuery mapQry : qry.mapQueries()) { GridMergeTable tbl = createMergeTable(c, mapQry, false); - fakeTable(c, tblIdx++).setInnerTable(tbl); + fakeTable(c, tblIdx++).innerTable(tbl); } GridCacheSqlQuery rdc = qry.reduceQuery(); @@ -1032,39 +1102,27 @@ public class GridReduceQueryExecutor { /** * @param nodes Nodes. * @param msg Message. - * @param partsMap Partitions. + * @param specialize Optional closure to specialize message for each node. + * @param runLocParallel Run local handler in parallel thread. * @return {@code true} If all messages sent successfully. */ private boolean send( Collection<ClusterNode> nodes, Message msg, - Map<ClusterNode,IntArray> partsMap + @Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize, + boolean runLocParallel ) { - boolean locNodeFound = false; - - boolean ok = true; - - for (ClusterNode node : nodes) { - if (node.isLocal()) { - locNodeFound = true; - - continue; - } - - try { - ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, partsMap), QUERY_POOL); - } - catch (IgniteCheckedException e) { - ok = false; - - U.warn(log, e.getMessage()); - } - } - - if (locNodeFound) // Local node goes the last to allow parallel execution. - h2.mapQueryExecutor().onMessage(ctx.localNodeId(), copy(msg, ctx.discovery().localNode(), partsMap)); - - return ok; + if (log.isDebugEnabled()) + log.debug("Sending: [msg=" + msg + ", nodes=" + nodes + ", specialize=" + specialize + "]"); + + return h2.send(GridTopic.TOPIC_QUERY, + GridTopic.TOPIC_QUERY.ordinal(), + nodes, + msg, + specialize, + locNodeHnd, + QUERY_POOL, + runLocParallel); } /** @@ -1074,8 +1132,7 @@ public class GridReduceQueryExecutor { * @return Copy of message with partitions set. */ private Message copy(Message msg, ClusterNode node, Map<ClusterNode,IntArray> partsMap) { - if (partsMap == null) - return msg; + assert partsMap != null; GridQueryRequest res = new GridQueryRequest((GridQueryRequest)msg); @@ -1083,11 +1140,35 @@ public class GridReduceQueryExecutor { assert parts != null : node; - int[] partsArr = new int[parts.size()]; + res.partitions(toArray(parts)); - parts.toArray(partsArr); + return res; + } - res.partitions(partsArr); + /** + * @param ints Ints. + * @return Array. + */ + public static int[] toArray(IntArray ints) { + int[] res = new int[ints.size()]; + + ints.toArray(res); + + return res; + } + + /** + * @param m Map. + * @return Converted map. + */ + private static Map<UUID, int[]> convert(Map<ClusterNode, IntArray> m) { + if (m == null) + return null; + + Map<UUID, int[]> res = U.newHashMap(m.size()); + + for (Map.Entry<ClusterNode,IntArray> entry : m.entrySet()) + res.put(entry.getKey().id(), toArray(entry.getValue())); return res; } @@ -1097,7 +1178,7 @@ public class GridReduceQueryExecutor { * @param qry Query. * @param explain Explain. * @return Table. - * @throws IgniteCheckedException + * @throws IgniteCheckedException If failed. */ private GridMergeTable createMergeTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain) throws IgniteCheckedException { @@ -1165,25 +1246,36 @@ public class GridReduceQueryExecutor { } /** - * + * Query run. */ private static class QueryRun { /** */ - private List<GridMergeIndex> idxs; + private final List<GridMergeIndex> idxs; /** */ private CountDownLatch latch; /** */ - private JdbcConnection conn; + private final JdbcConnection conn; /** */ - private int pageSize; + private final int pageSize; /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */ private final AtomicReference<Object> state = new AtomicReference<>(); /** + * @param conn Connection. + * @param idxsCnt Number of indexes. + * @param pageSize Page size. + */ + private QueryRun(Connection conn, int idxsCnt, int pageSize) { + this.conn = (JdbcConnection)conn; + this.idxs = new ArrayList<>(idxsCnt); + this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE; + } + + /** * @param o Fail state object. * @param nodeId Node ID. */ @@ -1240,4 +1332,24 @@ public class GridReduceQueryExecutor { return res; } } + + /** + * + */ + private class ExplicitPartitionsSpecializer implements IgniteBiClosure<ClusterNode,Message,Message> { + /** */ + private final Map<ClusterNode,IntArray> partsMap; + + /** + * @param partsMap Partitions map. + */ + private ExplicitPartitionsSpecializer(Map<ClusterNode,IntArray> partsMap) { + this.partsMap = partsMap; + } + + /** {@inheritDoc} */ + @Override public Message apply(ClusterNode n, Message msg) { + return copy(msg, n, partsMap); + } + } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java index a38d137..d46fb2f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import javax.cache.CacheException; import org.h2.api.TableEngine; import org.h2.command.ddl.CreateTableData; import org.h2.engine.DbObject; @@ -59,61 +60,74 @@ public class GridThreadLocalTable extends Table { /** * @param t Table or {@code null} to reset existing. */ - public void setInnerTable(Table t) { + public void innerTable(Table t) { if (t == null) tbl.remove(); else tbl.set(t); } + /** + * @return Inner table. + */ + private Table innerTable() { + Table t = tbl.get(); + + if (t == null) + throw new CacheException("Table `" + getName() + "` can be accessed only within Ignite query context."); + + return t; + } + /** {@inheritDoc} */ @Override public Index getPrimaryKey() { - return tbl.get().getPrimaryKey(); + return innerTable().getPrimaryKey(); } /** {@inheritDoc} */ @Override public Column getRowIdColumn() { - return tbl.get().getRowIdColumn(); + return innerTable().getRowIdColumn(); } /** {@inheritDoc} */ - @Override public PlanItem getBestPlanItem(Session session, int[] masks, TableFilter filter, SortOrder sortOrder) { - return tbl.get().getBestPlanItem(session, masks, filter, sortOrder); + @Override public PlanItem getBestPlanItem(Session session, int[] masks, TableFilter[] filters, int filter, + SortOrder sortOrder) { + return innerTable().getBestPlanItem(session, masks, filters, filter, sortOrder); } /** {@inheritDoc} */ @Override public Value getDefaultValue(Session session, Column column) { - return tbl.get().getDefaultValue(session, column); + return innerTable().getDefaultValue(session, column); } /** {@inheritDoc} */ @Override public SearchRow getTemplateSimpleRow(boolean singleColumn) { - return tbl.get().getTemplateSimpleRow(singleColumn); + return innerTable().getTemplateSimpleRow(singleColumn); } /** {@inheritDoc} */ @Override public Row getTemplateRow() { - return tbl.get().getTemplateRow(); + return innerTable().getTemplateRow(); } /** {@inheritDoc} */ @Override public Column getColumn(String columnName) { - return tbl.get().getColumn(columnName); + return innerTable().getColumn(columnName); } /** {@inheritDoc} */ @Override public Column getColumn(int index) { - return tbl.get().getColumn(index); + return innerTable().getColumn(index); } /** {@inheritDoc} */ @Override public Index getIndexForColumn(Column column) { - return tbl.get().getIndexForColumn(column); + return innerTable().getIndexForColumn(column); } /** {@inheritDoc} */ @Override public Column[] getColumns() { - return tbl.get().getColumns(); + return innerTable().getColumns(); } /** {@inheritDoc} */ @@ -122,8 +136,8 @@ public class GridThreadLocalTable extends Table { } /** {@inheritDoc} */ - @Override public void lock(Session session, boolean exclusive, boolean force) { - tbl.get().lock(session, exclusive, force); + @Override public boolean lock(Session session, boolean exclusive, boolean force) { + return innerTable().lock(session, exclusive, force); } /** {@inheritDoc} */ @@ -133,33 +147,33 @@ public class GridThreadLocalTable extends Table { /** {@inheritDoc} */ @Override public void unlock(Session s) { - tbl.get().unlock(s); + innerTable().unlock(s); } /** {@inheritDoc} */ @Override public Index addIndex(Session session, String indexName, int indexId, IndexColumn[] cols, IndexType indexType, boolean create, String indexComment) { - return tbl.get().addIndex(session, indexName, indexId, cols, indexType, create, indexComment); + return innerTable().addIndex(session, indexName, indexId, cols, indexType, create, indexComment); } /** {@inheritDoc} */ @Override public void removeRow(Session session, Row row) { - tbl.get().removeRow(session, row); + innerTable().removeRow(session, row); } /** {@inheritDoc} */ @Override public void truncate(Session session) { - tbl.get().truncate(session); + innerTable().truncate(session); } /** {@inheritDoc} */ @Override public void addRow(Session session, Row row) { - tbl.get().addRow(session, row); + innerTable().addRow(session, row); } /** {@inheritDoc} */ @Override public void checkSupportAlter() { - tbl.get().checkSupportAlter(); + innerTable().checkSupportAlter(); } /** {@inheritDoc} */ @@ -169,22 +183,22 @@ public class GridThreadLocalTable extends Table { /** {@inheritDoc} */ @Override public Index getUniqueIndex() { - return tbl.get().getUniqueIndex(); + return innerTable().getUniqueIndex(); } /** {@inheritDoc} */ @Override public Index getScanIndex(Session session) { - return tbl.get().getScanIndex(session); + return innerTable().getScanIndex(session); } /** {@inheritDoc} */ @Override public ArrayList<Index> getIndexes() { - return tbl.get().getIndexes(); + return innerTable().getIndexes(); } /** {@inheritDoc} */ @Override public boolean isLockedExclusively() { - return tbl.get().isLockedExclusively(); + return innerTable().isLockedExclusively(); } /** {@inheritDoc} */ @@ -194,12 +208,12 @@ public class GridThreadLocalTable extends Table { /** {@inheritDoc} */ @Override public boolean isDeterministic() { - return tbl.get().isDeterministic(); + return innerTable().isDeterministic(); } /** {@inheritDoc} */ @Override public boolean canGetRowCount() { - return tbl.get().canGetRowCount(); + return innerTable().canGetRowCount(); } /** {@inheritDoc} */ @@ -209,7 +223,7 @@ public class GridThreadLocalTable extends Table { /** {@inheritDoc} */ @Override public long getRowCount(Session session) { - return tbl.get().getRowCount(session); + return innerTable().getRowCount(session); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java index 362a760..571f9ac 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java @@ -115,7 +115,7 @@ public class GridH2Array extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2Array.class); } /** {@inheritDoc} */ @@ -127,4 +127,9 @@ public class GridH2Array extends GridH2ValueMessage { @Override public byte fieldsCount() { return 1; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return String.valueOf(x); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java index 0628764..edd404e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java @@ -99,7 +99,7 @@ public class GridH2Boolean extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2Boolean.class); } /** {@inheritDoc} */ @@ -107,7 +107,13 @@ public class GridH2Boolean extends GridH2ValueMessage { return -5; } + /** {@inheritDoc} */ @Override public byte fieldsCount() { return 1; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return String.valueOf(x); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java index 9f608b9..894794e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java @@ -99,7 +99,7 @@ public class GridH2Byte extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2Byte.class); } /** {@inheritDoc} */ @@ -111,4 +111,9 @@ public class GridH2Byte extends GridH2ValueMessage { @Override public byte fieldsCount() { return 1; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return String.valueOf(x); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Bytes.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Bytes.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Bytes.java index 0fb9e5d..29a52be 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Bytes.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Bytes.java @@ -24,6 +24,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; import org.h2.value.ValueBytes; +import static org.h2.util.StringUtils.convertBytesToHex; + /** * H2 Bytes. */ @@ -99,7 +101,7 @@ public class GridH2Bytes extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2Bytes.class); } /** {@inheritDoc} */ @@ -111,4 +113,9 @@ public class GridH2Bytes extends GridH2ValueMessage { @Override public byte fieldsCount() { return 1; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return "b_" + convertBytesToHex(b); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java index 1a81234..942ab7c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java @@ -102,7 +102,7 @@ public class GridH2CacheObject extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2CacheObject.class); } /** {@inheritDoc} */ @@ -146,4 +146,9 @@ public class GridH2CacheObject extends GridH2ValueMessage { @Override public byte fieldsCount() { return 2; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return String.valueOf(obj); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Date.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Date.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Date.java index 6bd5237..8025257 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Date.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Date.java @@ -101,7 +101,7 @@ public class GridH2Date extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2Date.class); } /** {@inheritDoc} */ @@ -113,4 +113,9 @@ public class GridH2Date extends GridH2ValueMessage { @Override public byte fieldsCount() { return 1; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return String.valueOf(date); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Decimal.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Decimal.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Decimal.java index f5d4865..a3ad444 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Decimal.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Decimal.java @@ -26,6 +26,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; import org.h2.value.ValueDecimal; +import static org.h2.util.StringUtils.convertBytesToHex; + /** * H2 Decimal. */ @@ -121,7 +123,7 @@ public class GridH2Decimal extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2Decimal.class); } /** {@inheritDoc} */ @@ -133,4 +135,9 @@ public class GridH2Decimal extends GridH2ValueMessage { @Override public byte fieldsCount() { return 2; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return scale + "_" + convertBytesToHex(b); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Double.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Double.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Double.java index 481db30..2ceea8d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Double.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Double.java @@ -99,7 +99,7 @@ public class GridH2Double extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2Double.class); } /** {@inheritDoc} */ @@ -111,4 +111,9 @@ public class GridH2Double extends GridH2ValueMessage { @Override public byte fieldsCount() { return 1; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return String.valueOf(x); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Float.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Float.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Float.java index 55f9380..6923470 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Float.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Float.java @@ -99,7 +99,7 @@ public class GridH2Float extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2Float.class); } /** {@inheritDoc} */ @@ -111,4 +111,9 @@ public class GridH2Float extends GridH2ValueMessage { @Override public byte fieldsCount() { return 1; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return String.valueOf(x); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Geometry.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Geometry.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Geometry.java index 21070d1..0d118b4 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Geometry.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Geometry.java @@ -25,6 +25,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; +import static org.h2.util.StringUtils.convertBytesToHex; + /** * H2 Geometry. */ @@ -120,7 +122,7 @@ public class GridH2Geometry extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2Geometry.class); } /** {@inheritDoc} */ @@ -132,4 +134,9 @@ public class GridH2Geometry extends GridH2ValueMessage { @Override public byte fieldsCount() { return 1; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return "g_" + convertBytesToHex(b); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java new file mode 100644 index 0000000..e49c48f --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep.msg; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.UUID; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Range request. + */ +public class GridH2IndexRangeRequest implements Message { + /** */ + private UUID originNodeId; + + /** */ + private long qryId; + + /** */ + private int batchLookupId; + + /** */ + @GridDirectCollection(Message.class) + private List<GridH2RowRangeBounds> bounds; + + /** + * @param bounds Range bounds list. + */ + public void bounds(List<GridH2RowRangeBounds> bounds) { + this.bounds = bounds; + } + + /** + * @return Range bounds list. + */ + public List<GridH2RowRangeBounds> bounds() { + return bounds; + } + + /** + * @return Origin node ID. + */ + public UUID originNodeId() { + return originNodeId; + } + + /** + * @param originNodeId Origin node ID. + */ + public void originNodeId(UUID originNodeId) { + this.originNodeId = originNodeId; + } + + /** + * @return Query ID. + */ + public long queryId() { + return qryId; + } + + /** + * @param qryId Query ID. + */ + public void queryId(long qryId) { + this.qryId = qryId; + } + + /** + * @param batchLookupId Batch lookup ID. + */ + public void batchLookupId(int batchLookupId) { + this.batchLookupId = batchLookupId; + } + + /** + * @return Batch lookup ID. + */ + public int batchLookupId() { + return batchLookupId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeInt("batchLookupId", batchLookupId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeCollection("bounds", bounds, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeUuid("originNodeId", originNodeId)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeLong("qryId", qryId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + batchLookupId = reader.readInt("batchLookupId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + bounds = reader.readCollection("bounds", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + originNodeId = reader.readUuid("originNodeId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + qryId = reader.readLong("qryId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridH2IndexRangeRequest.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -30; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridH2IndexRangeRequest.class, this, "boundsSize", bounds == null ? null : bounds.size()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java new file mode 100644 index 0000000..c6414bd --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep.msg; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.UUID; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Range response message. + */ +public class GridH2IndexRangeResponse implements Message { + /** */ + public static final byte STATUS_OK = 0; + + /** */ + public static final byte STATUS_ERROR = 1; + + /** */ + public static final byte STATUS_NOT_FOUND = 2; + + /** */ + private UUID originNodeId; + + /** */ + private long qryId; + + /** */ + private int batchLookupId; + + /** */ + @GridDirectCollection(Message.class) + private List<GridH2RowRange> ranges; + + /** */ + private byte status; + + /** */ + private String err; + + /** + * @param ranges Ranges. + */ + public void ranges(List<GridH2RowRange> ranges) { + this.ranges = ranges; + } + + /** + * @return Ranges. + */ + public List<GridH2RowRange> ranges() { + return ranges; + } + + /** + * @return Origin node ID. + */ + public UUID originNodeId() { + return originNodeId; + } + + /** + * @param originNodeId Origin node ID. + */ + public void originNodeId(UUID originNodeId) { + this.originNodeId = originNodeId; + } + + /** + * @return Query ID. + */ + public long queryId() { + return qryId; + } + + /** + * @param qryId Query ID. + */ + public void queryId(long qryId) { + this.qryId = qryId; + } + + /** + * @param err Error message. + */ + public void error(String err) { + this.err = err; + } + + /** + * @return Error message or {@code null} if everything is ok. + */ + public String error() { + return err; + } + + /** + * @param status Status. + */ + public void status(byte status) { + this.status = status; + } + + /** + * @return Status. + */ + public byte status() { + return status; + } + + /** + * @param batchLookupId Batch lookup ID. + */ + public void batchLookupId(int batchLookupId) { + this.batchLookupId = batchLookupId; + } + + /** + * @return Batch lookup ID. + */ + public int batchLookupId() { + return batchLookupId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeInt("batchLookupId", batchLookupId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeString("err", err)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeUuid("originNodeId", originNodeId)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeLong("qryId", qryId)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeCollection("ranges", ranges, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeByte("status", status)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + batchLookupId = reader.readInt("batchLookupId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + err = reader.readString("err"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + originNodeId = reader.readUuid("originNodeId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + qryId = reader.readLong("qryId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + ranges = reader.readCollection("ranges", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + status = reader.readByte("status"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridH2IndexRangeResponse.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -31; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 6; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridH2IndexRangeResponse.class, this, "rangesSize", ranges == null ? null : ranges.size()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Integer.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Integer.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Integer.java index 30c7623..7c6046c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Integer.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Integer.java @@ -99,7 +99,7 @@ public class GridH2Integer extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2Integer.class); } /** {@inheritDoc} */ @@ -111,4 +111,20 @@ public class GridH2Integer extends GridH2ValueMessage { @Override public byte fieldsCount() { return 1; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj == this || (obj != null && obj.getClass() == GridH2Integer.class && x == ((GridH2Integer)obj).x); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return x; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return String.valueOf(x); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2JavaObject.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2JavaObject.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2JavaObject.java index edfde33..b989171 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2JavaObject.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2JavaObject.java @@ -24,6 +24,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; import org.h2.value.ValueJavaObject; +import static org.h2.util.StringUtils.convertBytesToHex; + /** * H2 Java Object. */ @@ -99,7 +101,7 @@ public class GridH2JavaObject extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2JavaObject.class); } /** {@inheritDoc} */ @@ -111,4 +113,9 @@ public class GridH2JavaObject extends GridH2ValueMessage { @Override public byte fieldsCount() { return 1; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return "j_" + convertBytesToHex(b); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Long.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Long.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Long.java index e6af9e1..3d360f0 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Long.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Long.java @@ -99,7 +99,7 @@ public class GridH2Long extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2Long.class); } /** {@inheritDoc} */ @@ -111,4 +111,9 @@ public class GridH2Long extends GridH2ValueMessage { @Override public byte fieldsCount() { return 1; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return String.valueOf(x); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java index 2394b78..50a49ba 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java @@ -64,7 +64,13 @@ public class GridH2Null extends GridH2ValueMessage { @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); - return reader.beforeMessageRead() && super.readFrom(buf, reader); + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + return reader.afterMessageRead(GridH2Null.class); } /** {@inheritDoc} */ @@ -76,4 +82,9 @@ public class GridH2Null extends GridH2ValueMessage { @Override public byte fieldsCount() { return 0; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return "NULL"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java new file mode 100644 index 0000000..dc82b2c --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep.msg; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.GridDirectMap; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; +import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Query request. + */ +public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Map query will not destroy context until explicit query cancel request + * will be received because distributed join requests can be received. + */ + public static int FLAG_DISTRIBUTED_JOINS = 1; + + /** */ + private long reqId; + + /** */ + @GridToStringInclude + @GridDirectCollection(Integer.class) + private List<Integer> caches; + + /** Topology version. */ + private AffinityTopologyVersion topVer; + + /** Explicit partitions mappings for nodes. */ + @GridToStringInclude + @GridDirectMap(keyType = UUID.class, valueType = int[].class) + private Map<UUID, int[]> parts; + + /** */ + private int pageSize; + + /** */ + @GridToStringInclude + @GridDirectCollection(Message.class) + private List<GridCacheSqlQuery> qrys; + + /** */ + private byte flags; + + /** */ + @GridToStringInclude + @GridDirectCollection(String.class) + private Collection<String> tbls; + + /** + * @param tbls Tables. + * @return {@code this}. + */ + public GridH2QueryRequest tables(Collection<String> tbls) { + this.tbls = tbls; + + return this; + } + + /** + * @return Tables. + */ + public Collection<String> tables() { + return tbls; + } + + /** + * @param reqId Request ID. + * @return {@code this}. + */ + public GridH2QueryRequest requestId(long reqId) { + this.reqId = reqId; + + return this; + } + + /** + * @return Request ID. + */ + public long requestId() { + return reqId; + } + + /** + * @param caches Caches. + * @return {@code this}. + */ + public GridH2QueryRequest caches(List<Integer> caches) { + this.caches = caches; + + return this; + } + + /** + * @return Caches. + */ + public List<Integer> caches() { + return caches; + } + + /** + * @param topVer Topology version. + * @return {@code this}. + */ + public GridH2QueryRequest topologyVersion(AffinityTopologyVersion topVer) { + this.topVer = topVer; + + return this; + } + + /** + * @return Topology version. + */ + public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @return Explicit partitions mapping. + */ + public Map<UUID,int[]> partitions() { + return parts; + } + + /** + * @param parts Explicit partitions mapping. + * @return {@code this}. + */ + public GridH2QueryRequest partitions(Map<UUID,int[]> parts) { + this.parts = parts; + + return this; + } + + /** + * @param pageSize Page size. + * @return {@code this}. + */ + public GridH2QueryRequest pageSize(int pageSize) { + this.pageSize = pageSize; + + return this; + } + + /** + * @return Page size. + */ + public int pageSize() { + return pageSize; + } + + /** + * @param qrys SQL Queries. + * @return {@code this}. + */ + public GridH2QueryRequest queries(List<GridCacheSqlQuery> qrys) { + this.qrys = qrys; + + return this; + } + + /** + * @return SQL Queries. + */ + public List<GridCacheSqlQuery> queries() { + return qrys; + } + + /** + * @param flags Flags. + * @return {@code this}. + */ + public GridH2QueryRequest flags(int flags) { + this.flags = (byte)flags; + + return this; + } + + /** + * @param flags Flags to check. + * @return {@code true} If all the requested flags are set to {@code true}. + */ + public boolean isFlagSet(int flags) { + return (this.flags & flags) == flags; + } + + /** {@inheritDoc} */ + @Override public void marshall(Marshaller m) { + if (F.isEmpty(qrys)) + return; + + for (GridCacheSqlQuery qry : qrys) + qry.marshall(m); + } + + /** {@inheritDoc} */ + @Override public void unmarshall(Marshaller m, GridKernalContext ctx) { + if (F.isEmpty(qrys)) + return; + + for (GridCacheSqlQuery qry : qrys) + qry.unmarshall(m, ctx); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeCollection("caches", caches, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeInt("pageSize", pageSize)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeLong("reqId", reqId)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.STRING)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + caches = reader.readCollection("caches", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + pageSize = reader.readInt("pageSize"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + reqId = reader.readLong("reqId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + tbls = reader.readCollection("tbls", MessageCollectionItemType.STRING); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridH2QueryRequest.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -33; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 8; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridH2QueryRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowMessage.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowMessage.java new file mode 100644 index 0000000..59c548d --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowMessage.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep.msg; + +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * SQL Row message. + */ +public class GridH2RowMessage implements Message { + /** */ + @GridDirectCollection(Message.class) + @GridToStringInclude + private List<GridH2ValueMessage> vals; + + /** + * @return Values of row. + */ + public List<GridH2ValueMessage> values() { + return vals; + } + + /** + * @param vals Values of row. + */ + public void values(List<GridH2ValueMessage> vals) { + this.vals = vals; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + vals = reader.readCollection("vals", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridH2RowMessage.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -32; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridH2RowMessage.class, this); + } +}
