http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java index 914e0da..0829df0 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java @@ -17,12 +17,15 @@ package org.apache.ignite.internal.processors.query.h2.opt; +import java.lang.reflect.Field; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.NavigableMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.util.GridEmptyIterator; import org.apache.ignite.internal.util.offheap.unsafe.GridOffHeapSnapTreeMap; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard; @@ -43,18 +46,36 @@ import org.h2.value.Value; import org.jetbrains.annotations.Nullable; /** - * Base class for snapshotable tree indexes. + * Base class for snapshotable segmented tree indexes. */ @SuppressWarnings("ComparatorNotSerializable") public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridSearchRowPointer> { + /** */ - private final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree; + private static Field KEY_FIELD; + + /** */ + static { + try { + KEY_FIELD = GridH2AbstractKeyValueRow.class.getDeclaredField("key"); + KEY_FIELD.setAccessible(true); + } + catch (NoSuchFieldException e) { + KEY_FIELD = null; + } + } + + /** Index segments. */ + private final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row>[] segments; /** */ private final boolean snapshotEnabled; + /** */ + private final GridH2RowDescriptor desc; + /** - * Constructor with index initialization. + * Constructor with index initialization. Creates index with single segment. * * @param name Index name. * @param tbl Table. @@ -63,35 +84,59 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS */ @SuppressWarnings("unchecked") public GridH2TreeIndex(String name, GridH2Table tbl, boolean pk, List<IndexColumn> colsList) { + this(name, tbl, pk, colsList, 1); + } + + /** + * Constructor with index initialization. + * + * @param name Index name. + * @param tbl Table. + * @param pk If this index is primary key. + * @param colsList Index columns list. + * @param segmentsCnt Number of segments. + */ + @SuppressWarnings("unchecked") + public GridH2TreeIndex(String name, GridH2Table tbl, boolean pk, List<IndexColumn> colsList, int segmentsCnt) { + assert segmentsCnt > 0 : segmentsCnt; + IndexColumn[] cols = colsList.toArray(new IndexColumn[colsList.size()]); IndexColumn.mapColumns(cols, tbl); + desc = tbl.rowDescriptor(); + initBaseIndex(tbl, 0, name, cols, pk ? IndexType.createPrimaryKey(false, false) : IndexType.createNonUnique(false, false, false)); + segments = new ConcurrentNavigableMap[segmentsCnt]; + final GridH2RowDescriptor desc = tbl.rowDescriptor(); if (desc == null || desc.memory() == null) { snapshotEnabled = desc == null || desc.snapshotableIndex(); if (snapshotEnabled) { - tree = new SnapTreeMap<GridSearchRowPointer, GridH2Row>(this) { - @Override protected void afterNodeUpdate_nl(Node<GridSearchRowPointer, GridH2Row> node, Object val) { - if (val != null) - node.key = (GridSearchRowPointer)val; - } + for (int i = 0; i < segmentsCnt; i++) { + segments[i] = new SnapTreeMap<GridSearchRowPointer, GridH2Row>(this) { + @Override + protected void afterNodeUpdate_nl(Node<GridSearchRowPointer, GridH2Row> node, Object val) { + if (val != null) + node.key = (GridSearchRowPointer)val; + } - @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) { - if (key instanceof ComparableRow) - return (Comparable<? super SearchRow>)key; + @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) { + if (key instanceof ComparableRow) + return (Comparable<? super SearchRow>)key; - return super.comparable(key); - } - }; + return super.comparable(key); + } + }; + } } else { - tree = new ConcurrentSkipListMap<>( + for (int i = 0; i < segmentsCnt; i++) { + segments[i] = new ConcurrentSkipListMap<>( new Comparator<GridSearchRowPointer>() { @Override public int compare(GridSearchRowPointer o1, GridSearchRowPointer o2) { if (o1 instanceof ComparableRow) @@ -103,7 +148,8 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS return compareRows(o1, o2); } } - ); + ); + } } } else { @@ -111,28 +157,30 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS snapshotEnabled = true; - tree = new GridOffHeapSnapTreeMap<GridSearchRowPointer, GridH2Row>(desc, desc, desc.memory(), desc.guard(), this) { - @Override protected void afterNodeUpdate_nl(long node, GridH2Row val) { - final long oldKey = keyPtr(node); + for (int i = 0; i < segmentsCnt; i++) { + segments[i] = new GridOffHeapSnapTreeMap<GridSearchRowPointer, GridH2Row>(desc, desc, desc.memory(), desc.guard(), this) { + @Override protected void afterNodeUpdate_nl(long node, GridH2Row val) { + final long oldKey = keyPtr(node); - if (val != null) { - key(node, val); + if (val != null) { + key(node, val); - guard.finalizeLater(new Runnable() { - @Override public void run() { - desc.createPointer(oldKey).decrementRefCount(); - } - }); + guard.finalizeLater(new Runnable() { + @Override public void run() { + desc.createPointer(oldKey).decrementRefCount(); + } + }); + } } - } - @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) { - if (key instanceof ComparableRow) - return (Comparable<? super SearchRow>)key; + @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) { + if (key instanceof ComparableRow) + return (Comparable<? super SearchRow>)key; - return super.comparable(key); - } - }; + return super.comparable(key); + } + }; + } } initDistributedJoinMessaging(tbl); @@ -142,20 +190,24 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS @Override protected Object doTakeSnapshot() { assert snapshotEnabled; + int seg = threadLocalSegment(); + + ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree = segments[seg]; + return tree instanceof SnapTreeMap ? ((SnapTreeMap)tree).clone() : ((GridOffHeapSnapTreeMap)tree).clone(); } /** {@inheritDoc} */ - protected final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead() { + protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead(int seg) { if (!snapshotEnabled) - return tree; + return segments[seg]; ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> res = threadLocalSnapshot(); if (res == null) - res = tree; + return segments[seg]; return res; } @@ -164,19 +216,37 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS @Override public void destroy() { assert threadLocalSnapshot() == null; - if (tree instanceof AutoCloseable) - U.closeQuiet((AutoCloseable)tree); + for (int i = 0; i < segments.length; i++) { + if (segments[i] instanceof AutoCloseable) + U.closeQuiet((AutoCloseable)segments[i]); + } super.destroy(); } + + /** {@inheritDoc} */ + protected int threadLocalSegment() { + GridH2QueryContext qctx = GridH2QueryContext.get(); + + if(segments.length == 1) + return 0; + + if(qctx == null) + throw new IllegalStateException("GridH2QueryContext is not initialized."); + + return qctx.segment(); + } + /** {@inheritDoc} */ @Override public long getRowCount(@Nullable Session ses) { IndexingQueryFilter f = threadLocalFilter(); + int seg = threadLocalSegment(); + // Fast path if we don't need to perform any filtering. if (f == null || f.forSpace((getTable()).spaceName()) == null) - return treeForRead().size(); + return treeForRead(seg).size(); Iterator<GridH2Row> iter = doFind(null, false, null); @@ -255,7 +325,9 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS * @return Row. */ public GridH2Row findOne(GridSearchRowPointer row) { - return tree.get(row); + int seg = threadLocalSegment(); + + return segments[seg].get(row); } /** @@ -268,7 +340,9 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS */ @SuppressWarnings("unchecked") private Iterator<GridH2Row> doFind(@Nullable SearchRow first, boolean includeFirst, @Nullable SearchRow last) { - ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = treeForRead(); + int seg = threadLocalSegment(); + + ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = treeForRead(seg); return doFind0(t, first, includeFirst, last, threadLocalFilter()); } @@ -359,12 +433,68 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS /** {@inheritDoc} */ @Override public GridH2Row put(GridH2Row row) { - return tree.put(row, row); + int seg = segment(row); + + return segments[seg].put(row, row); } /** {@inheritDoc} */ @Override public GridH2Row remove(SearchRow row) { - return tree.remove(comparable(row, 0)); + GridSearchRowPointer comparable = comparable(row, 0); + + int seg = segment(row); + + return segments[seg].remove(comparable); + } + + /** {@inheritDoc} */ + @Override protected int segmentsCount() { + return segments.length; + } + + /** + * @param partition Parttition idx. + * @return index currentSegment Id for given key + */ + protected int segment(int partition) { + return partition % segments.length; + } + + /** + * @param row + * @return index currentSegment Id for given row + */ + private int segment(SearchRow row) { + assert row != null; + + CacheObject key; + + if (desc != null && desc.context() != null) { + GridCacheContext<?, ?> ctx = desc.context(); + + assert ctx != null; + + if (row instanceof GridH2AbstractKeyValueRow && KEY_FIELD != null) { + try { + Object o = KEY_FIELD.get(row); + + if (o instanceof CacheObject) + key = (CacheObject)o; + else + key = ctx.toCacheKeyObject(o); + + } + catch (IllegalAccessException e) { + throw new IllegalStateException(e); + } + } + else + key = ctx.toCacheKeyObject(row.getValue(0)); + + return segment(ctx.affinity().partition(key)); + } + else + return 0; } /** @@ -463,18 +593,20 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS IndexColumn[] cols = getIndexColumns(); GridH2TreeIndex idx = new GridH2TreeIndex(getName(), getTable(), - getIndexType().isUnique(), F.asList(cols)); + getIndexType().isUnique(), F.asList(cols), segments.length); Thread thread = Thread.currentThread(); - long i = 0; + long j = 0; - for (GridH2Row row : tree.values()) { - // Check for interruptions every 1000 iterations. - if (++i % 1000 == 0 && thread.isInterrupted()) - throw new InterruptedException(); + for (int i = 0; i < segments.length; i++) { + for (GridH2Row row : segments[i].values()) { + // Check for interruptions every 1000 iterations. + if ((++j & 1023) == 0 && thread.isInterrupted()) + throw new InterruptedException(); - idx.tree.put(row, row); + idx.put(row); + } } return idx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index ac1a6a6..5027c9a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReferenceArray; import javax.cache.CacheException; @@ -56,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode; import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; @@ -84,6 +86,8 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; +import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED; import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.QUERY_POOL; @@ -161,8 +165,7 @@ public class GridMapQueryExecutor { if (nodeRess == null) return; - for (QueryResults ress : nodeRess.results().values()) - ress.cancel(true); + nodeRess.cancelAll(); } }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); @@ -235,12 +238,7 @@ public class GridMapQueryExecutor { GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP); } - QueryResults results = nodeRess.results().remove(qryReqId); - - if (results == null) - return; - - results.cancel(true); + nodeRess.cancelRequest(qryReqId); } /** @@ -427,6 +425,7 @@ public class GridMapQueryExecutor { onQueryRequest0(node, req.requestId(), + 0, req.queries(), cacheIds, req.topologyVersion(), @@ -434,7 +433,7 @@ public class GridMapQueryExecutor { req.partitions(), null, req.pageSize(), - false, + OFF, req.timeout()); } @@ -442,12 +441,49 @@ public class GridMapQueryExecutor { * @param node Node. * @param req Query request. */ - private void onQueryRequest(ClusterNode node, GridH2QueryRequest req) { - Map<UUID,int[]> partsMap = req.partitions(); - int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId()); + private void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req) throws IgniteCheckedException { + final Map<UUID,int[]> partsMap = req.partitions(); + final int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId()); + + assert req.caches() != null && !req.caches().isEmpty(); + + GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext( req.caches().get(0)); + + if (mainCctx == null) + throw new CacheException("Failed to find cache."); + + final DistributedJoinMode joinMode = distributedJoinMode( + req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL), + req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS)); + + for (int i = 1; i < mainCctx.config().getQueryParallelism(); i++) { + final int segment = i; + + ctx.closure().callLocal( + new Callable<Void>() { + @Override public Void call() throws Exception { + onQueryRequest0(node, + req.requestId(), + segment, + req.queries(), + req.caches(), + req.topologyVersion(), + partsMap, + parts, + req.tables(), + req.pageSize(), + joinMode, + req.timeout()); + + return null; + } + } + , QUERY_POOL); + } onQueryRequest0(node, req.requestId(), + 0, req.queries(), req.caches(), req.topologyVersion(), @@ -455,13 +491,14 @@ public class GridMapQueryExecutor { parts, req.tables(), req.pageSize(), - req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS), + joinMode, req.timeout()); } /** * @param node Node authored request. * @param reqId Request ID. + * @param segmentId index segment ID. * @param qrys Queries to execute. * @param cacheIds Caches which will be affected by these queries. * @param topVer Topology version. @@ -469,11 +506,12 @@ public class GridMapQueryExecutor { * @param parts Explicit partitions for current node. * @param tbls Tables. * @param pageSize Page size. - * @param distributedJoins Can we expect distributed joins to be ran. + * @param distributedJoinMode Query distributed join mode. */ private void onQueryRequest0( ClusterNode node, long reqId, + int segmentId, Collection<GridCacheSqlQuery> qrys, List<Integer> cacheIds, AffinityTopologyVersion topVer, @@ -481,7 +519,7 @@ public class GridMapQueryExecutor { int[] parts, Collection<String> tbls, int pageSize, - boolean distributedJoins, + DistributedJoinMode distributedJoinMode, int timeout ) { // Prepare to run queries. @@ -500,7 +538,7 @@ public class GridMapQueryExecutor { if (topVer != null) { // Reserve primary for topology version or explicit partitions. if (!reservePartitions(cacheIds, topVer, parts, reserved)) { - sendRetry(node, reqId); + sendRetry(node, reqId, segmentId); return; } @@ -508,17 +546,18 @@ public class GridMapQueryExecutor { qr = new QueryResults(reqId, qrys.size(), mainCctx); - if (nodeRess.results().put(reqId, qr) != null) + if (nodeRess.put(reqId, segmentId, qr) != null) throw new IllegalStateException(); // Prepare query context. GridH2QueryContext qctx = new GridH2QueryContext(ctx.localNodeId(), node.id(), reqId, + segmentId, mainCctx.isReplicated() ? REPLICATED : MAP) .filter(h2.backupFilter(topVer, parts)) .partitionsMap(partsMap) - .distributedJoins(distributedJoins) + .distributedJoinMode(distributedJoinMode) .pageSize(pageSize) .topologyVersion(topVer) .reservations(reserved); @@ -542,7 +581,7 @@ public class GridMapQueryExecutor { Connection conn = h2.connectionForSpace(mainCctx.name()); // Here we enforce join order to have the same behavior on all the nodes. - h2.setupConnection(conn, distributedJoins, true); + h2.setupConnection(conn, distributedJoinMode != OFF, true); GridH2QueryContext.set(qctx); @@ -553,13 +592,13 @@ public class GridMapQueryExecutor { if (nodeRess.cancelled(reqId)) { GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type()); - nodeRess.results().remove(reqId); + nodeRess.cancelRequest(reqId); throw new QueryCancelledException(); } // Run queries. - int i = 0; + int qryIdx = 0; boolean evt = ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED); @@ -567,7 +606,7 @@ public class GridMapQueryExecutor { ResultSet rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(), F.asList(qry.parameters()), true, timeout, - qr.cancels[i]); + qr.cancels[qryIdx]); if (evt) { ctx.event().record(new CacheQueryExecutedEvent<>( @@ -587,24 +626,24 @@ public class GridMapQueryExecutor { assert rs instanceof JdbcResultSet : rs.getClass(); - qr.addResult(i, qry, node.id(), rs); + qr.addResult(qryIdx, qry, node.id(), rs); if (qr.canceled) { - qr.result(i).close(); + qr.result(qryIdx).close(); throw new QueryCancelledException(); } // Send the first page. - sendNextPage(nodeRess, node, qr, i, pageSize); + sendNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize); - i++; + qryIdx++; } } finally { GridH2QueryContext.clearThreadLocal(); - if (!distributedJoins) + if (distributedJoinMode == OFF) qctx.clearContext(false); if (!F.isEmpty(snapshotedTbls)) { @@ -615,13 +654,13 @@ public class GridMapQueryExecutor { } catch (Throwable e) { if (qr != null) { - nodeRess.results().remove(reqId, qr); + nodeRess.remove(reqId, segmentId, qr); qr.cancel(false); } if (X.hasCause(e, GridH2RetryException.class)) - sendRetry(node, reqId); + sendRetry(node, reqId, segmentId); else { U.error(log, "Failed to execute local query.", e); @@ -681,14 +720,14 @@ public class GridMapQueryExecutor { return; } - QueryResults qr = nodeRess.results().get(req.queryRequestId()); + QueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId()); if (qr == null) sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req)); else if (qr.canceled) sendError(node, req.queryRequestId(), new QueryCancelledException()); else - sendNextPage(nodeRess, node, qr, req.query(), req.pageSize()); + sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize()); } /** @@ -696,9 +735,10 @@ public class GridMapQueryExecutor { * @param node Node. * @param qr Query results. * @param qry Query. + * @param segmentId Index segment ID. * @param pageSize Page size. */ - private void sendNextPage(NodeResults nodeRess, ClusterNode node, QueryResults qr, int qry, + private void sendNextPage(NodeResults nodeRess, ClusterNode node, QueryResults qr, int qry, int segmentId, int pageSize) { QueryResult res = qr.result(qry); @@ -714,14 +754,14 @@ public class GridMapQueryExecutor { res.close(); if (qr.isAllClosed()) - nodeRess.results().remove(qr.qryReqId, qr); + nodeRess.remove(qr.qryReqId, segmentId, qr); } try { boolean loc = node.isLocal(); - GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, qry, page, - page == 0 ? res.rowCnt : -1 , + GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, segmentId, qry, page, + page == 0 ? res.rowCnt : -1, res.cols, loc ? null : toMessages(rows, new ArrayList<Message>(res.cols)), loc ? rows : null); @@ -741,12 +781,13 @@ public class GridMapQueryExecutor { /** * @param node Node. * @param reqId Request ID. + * @param segmentId Index segment ID. */ - private void sendRetry(ClusterNode node, long reqId) { + private void sendRetry(ClusterNode node, long reqId, int segmentId) { try { boolean loc = node.isLocal(); - GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId, + GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId, segmentId, /*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1, loc ? null : Collections.<Message>emptyList(), loc ? Collections.<Value[]>emptyList() : null); @@ -780,35 +821,118 @@ public class GridMapQueryExecutor { */ private static class NodeResults { /** */ - private final ConcurrentMap<Long, QueryResults> res = new ConcurrentHashMap8<>(); + private final ConcurrentMap<RequestKey, QueryResults> res = new ConcurrentHashMap8<>(); /** */ private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist = new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q); /** - * @return All results. + * @param reqId Query Request ID. + * @return {@code False} if query was already cancelled. */ - ConcurrentMap<Long, QueryResults> results() { - return res; + boolean cancelled(long reqId) { + return qryHist.get(reqId) != null; } /** - * @param qryId Query ID. - * @return {@code False} if query was already cancelled. + * @param reqId Query Request ID. + * @return {@code True} if cancelled. */ - boolean cancelled(long qryId) { - return qryHist.get(qryId) != null; + boolean onCancel(long reqId) { + Boolean old = qryHist.putIfAbsent(reqId, Boolean.FALSE); + + return old == null; } /** - * @param qryId Query ID. - * @return {@code True} if cancelled. + * @param reqId Query Request ID. + * @param segmentId Index segment ID. + * @return query partial results. */ - boolean onCancel(long qryId) { - Boolean old = qryHist.putIfAbsent(qryId, Boolean.FALSE); + public QueryResults get(long reqId, int segmentId) { + return res.get(new RequestKey(reqId, segmentId)); + } - return old == null; + /** + * Cancel all thread of given request. + * @param reqID Request ID. + */ + public void cancelRequest(long reqID) { + for (RequestKey key : res.keySet()) { + if (key.reqId == reqID) { + QueryResults removed = res.remove(key); + + if (removed != null) + removed.cancel(true); + } + + } + } + + /** + * @param reqId Query Request ID. + * @param segmentId Index segment ID. + * @param qr Query Results. + * @return {@code True} if removed. + */ + public boolean remove(long reqId, int segmentId, QueryResults qr) { + return res.remove(new RequestKey(reqId, segmentId), qr); + } + + /** + * @param reqId Query Request ID. + * @param segmentId Index segment ID. + * @param qr Query Results. + * @return previous value. + */ + public QueryResults put(long reqId, int segmentId, QueryResults qr) { + return res.put(new RequestKey(reqId, segmentId), qr); + } + + /** + * Cancel all node queries. + */ + public void cancelAll() { + for (QueryResults ress : res.values()) + ress.cancel(true); + } + + /** + * + */ + private static class RequestKey { + /** */ + private long reqId; + + /** */ + private int segmentId; + + /** Constructor */ + RequestKey(long reqId, int segmentId) { + this.reqId = reqId; + this.segmentId = segmentId; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + RequestKey other = (RequestKey)o; + + return reqId == other.reqId && segmentId == other.segmentId; + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = (int)(reqId ^ (reqId >>> 32)); + result = 31 * result + segmentId; + return result; + } } } @@ -836,7 +960,8 @@ public class GridMapQueryExecutor { * @param qrys Number of queries. * @param cctx Cache context. */ - private QueryResults(long qryReqId, int qrys, GridCacheContext<?,?> cctx) { + @SuppressWarnings("unchecked") + private QueryResults(long qryReqId, int qrys, GridCacheContext<?, ?> cctx) { this.qryReqId = qryReqId; this.cctx = cctx; http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java index c267f4a..45d3c58 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java @@ -59,7 +59,7 @@ public abstract class GridMergeIndex extends BaseIndex { private final AtomicInteger expRowsCnt = new AtomicInteger(0); /** Remaining rows per source node ID. */ - private Map<UUID, Counter> remainingRows; + private Map<UUID, Counter[]> remainingRows; /** */ private final AtomicBoolean lastSubmitted = new AtomicBoolean(); @@ -141,15 +141,22 @@ public abstract class GridMergeIndex extends BaseIndex { * Set source nodes. * * @param nodes Nodes. + * @param segmentsCnt Index segments per table. */ - public void setSources(Collection<ClusterNode> nodes) { + public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) { assert remainingRows == null; remainingRows = U.newHashMap(nodes.size()); for (ClusterNode node : nodes) { - if (remainingRows.put(node.id(), new Counter()) != null) + Counter[] counters = new Counter[segmentsCnt]; + + for (int i = 0; i < segmentsCnt; i++) + counters[i] = new Counter(); + + if (remainingRows.put(node.id(), counters) != null) throw new IllegalStateException("Duplicate node id: " + node.id()); + } } @@ -194,7 +201,7 @@ public abstract class GridMergeIndex extends BaseIndex { public final void addPage(GridResultPage page) { int pageRowsCnt = page.rowsInPage(); - Counter cnt = remainingRows.get(page.source()); + Counter cnt = remainingRows.get(page.source())[page.res.segmentId()]; // RemainingRowsCount should be updated before page adding to avoid race // in GridMergeIndexUnsorted cursor iterator @@ -231,13 +238,14 @@ public abstract class GridMergeIndex extends BaseIndex { // Guarantee that finished state possible only if counter is zero and all pages was added cnt.state = State.FINISHED; - for (Counter c : remainingRows.values()) { // Check all the sources. - if (c.state != State.FINISHED) - return; + for (Counter[] cntrs : remainingRows.values()) { // Check all the sources. + for(int i = 0; i < cntrs.length; i++) { + if (cntrs[i].state != State.FINISHED) + return; + } } if (lastSubmitted.compareAndSet(false, true)) { - // Add page-marker that last page was added addPage0(new GridResultPage(null, page.source(), null) { @Override public boolean isLast() { return true; @@ -256,7 +264,20 @@ public abstract class GridMergeIndex extends BaseIndex { * @param page Page. */ protected void fetchNextPage(GridResultPage page) { - if (remainingRows.get(page.source()).get() != 0) + assert !page.isLast(); + + if(page.isFail()) + page.fetchNextPage(); //rethrow exceptions + + assert page.res != null; + + Counter[] counters = remainingRows.get(page.source()); + + int segId = page.res.segmentId(); + + Counter counter = counters[segId]; + + if (counter.get() != 0) page.fetchNextPage(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index f54fab6..8837046 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -62,9 +62,9 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; -import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; import org.apache.ignite.internal.processors.query.GridQueryCancel; +import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter; @@ -100,6 +100,7 @@ import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS; +import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE; /** @@ -294,6 +295,7 @@ public class GridReduceQueryExecutor { private void onNextPage(final ClusterNode node, GridQueryNextPageResponse msg) { final long qryReqId = msg.queryRequestId(); final int qry = msg.query(); + final int seg = msg.segmentId(); final QueryRun r = runs.get(qryReqId); @@ -326,7 +328,7 @@ public class GridReduceQueryExecutor { } try { - GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, pageSize); + GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, seg, pageSize); if (node.isLocal()) h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0); @@ -514,29 +516,33 @@ public class GridReduceQueryExecutor { // Explicit partition mapping for unstable topology. Map<ClusterNode, IntArray> partsMap = null; - if (isPreloadingActive(cctx, extraSpaces)) { - if (cctx.isReplicated()) - nodes = replicatedUnstableDataNodes(cctx, extraSpaces); - else { - partsMap = partitionedUnstableDataNodes(cctx, extraSpaces); + if (qry.isLocal()) + nodes = Collections.singleton(ctx.discovery().localNode()); + else { + if (isPreloadingActive(cctx, extraSpaces)) { + if (cctx.isReplicated()) + nodes = replicatedUnstableDataNodes(cctx, extraSpaces); + else { + partsMap = partitionedUnstableDataNodes(cctx, extraSpaces); - nodes = partsMap == null ? null : partsMap.keySet(); + nodes = partsMap == null ? null : partsMap.keySet(); + } } - } - else - nodes = stableDataNodes(topVer, cctx, extraSpaces); + else + nodes = stableDataNodes(topVer, cctx, extraSpaces); - if (nodes == null) - continue; // Retry. + if (nodes == null) + continue; // Retry. - assert !nodes.isEmpty(); + assert !nodes.isEmpty(); - if (cctx.isReplicated() || qry.explain()) { - assert qry.explain() || !nodes.contains(ctx.discovery().localNode()) : - "We must be on a client node."; + if (cctx.isReplicated() || qry.explain()) { + assert qry.explain() || !nodes.contains(ctx.discovery().localNode()) : + "We must be on a client node."; - // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node. - nodes = Collections.singleton(F.rand(nodes)); + // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node. + nodes = Collections.singleton(F.rand(nodes)); + } } final Collection<ClusterNode> finalNodes = nodes; @@ -545,6 +551,8 @@ public class GridReduceQueryExecutor { final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable(); + final int segmentsPerIndex = cctx.config().getQueryParallelism(); + for (GridCacheSqlQuery mapQry : qry.mapQueries()) { GridMergeIndex idx; @@ -565,12 +573,12 @@ public class GridReduceQueryExecutor { else idx = GridMergeIndexUnsorted.createDummy(ctx); - idx.setSources(nodes); + idx.setSources(nodes, segmentsPerIndex); r.idxs.add(idx); } - r.latch = new CountDownLatch(r.idxs.size() * nodes.size()); + r.latch = new CountDownLatch(r.idxs.size() * nodes.size() * segmentsPerIndex); runs.put(qryReqId, r); @@ -626,14 +634,13 @@ public class GridReduceQueryExecutor { .tables(distributedJoins ? qry.tables() : null) .partitions(convert(partsMap)) .queries(mapQrys) - .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0) + .flags((qry.isLocal() ? GridH2QueryRequest.FLAG_IS_LOCAL : 0) | + (distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0)) .timeout(timeoutMillis), oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null, - distributedJoins) - ) { - awaitAllReplies(r, nodes); + false)) { - cancel.checkCancelled(); + awaitAllReplies(r, nodes, cancel); Object state = r.state.get(); @@ -696,7 +703,7 @@ public class GridReduceQueryExecutor { h2.setupConnection(r.conn, false, enforceJoinOrder); GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE) - .pageSize(r.pageSize).distributedJoins(false)); + .pageSize(r.pageSize).distributedJoinMode(OFF)); try { if (qry.explain()) @@ -817,11 +824,15 @@ public class GridReduceQueryExecutor { /** * @param r Query run. * @param nodes Nodes to check periodically if they alive. + * @param cancel Query cancel. * @throws IgniteInterruptedCheckedException If interrupted. */ - private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes) - throws IgniteInterruptedCheckedException { + private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes, GridQueryCancel cancel) + throws IgniteInterruptedCheckedException, QueryCancelledException { while (!U.await(r.latch, 500, TimeUnit.MILLISECONDS)) { + + cancel.checkCancelled(); + for (ClusterNode node : nodes) { if (!ctx.discovery().alive(node)) { handleNodeLeft(r, node.id()); http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java index e49c48f..b2548cc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java @@ -38,6 +38,12 @@ public class GridH2IndexRangeRequest implements Message { private long qryId; /** */ + private int originSegmentId; + + /** */ + private int segmentId; + + /** */ private int batchLookupId; /** */ @@ -87,6 +93,34 @@ public class GridH2IndexRangeRequest implements Message { } /** + * @param segmentId Index segment ID. + */ + public void segment(int segmentId) { + this.segmentId = segmentId; + } + + /** + * @return Index segment ID. + */ + public int segment() { + return segmentId; + } + + /** + * @return Origin index segment ID. + */ + public int originSegmentId() { + return originSegmentId; + } + + /** + * @param segmentId Origin index segment ID. + */ + public void originSegmentId(int segmentId) { + this.originSegmentId = segmentId; + } + + /** * @param batchLookupId Batch lookup ID. */ public void batchLookupId(int batchLookupId) { @@ -136,6 +170,15 @@ public class GridH2IndexRangeRequest implements Message { writer.incrementState(); + case 4: + if (!writer.writeInt("segmentId", segmentId)) + return false; + + case 5: + if (!writer.writeInt("originSegId", originSegmentId)) + return false; + + writer.incrementState(); } return true; @@ -181,6 +224,21 @@ public class GridH2IndexRangeRequest implements Message { reader.incrementState(); + case 4: + segmentId = reader.readInt("segmentId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + originSegmentId = reader.readInt("originSegId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); } return reader.afterMessageRead(GridH2IndexRangeRequest.class); @@ -193,7 +251,7 @@ public class GridH2IndexRangeRequest implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 4; + return 6; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java index c6414bd..4d3db12 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java @@ -47,6 +47,12 @@ public class GridH2IndexRangeResponse implements Message { private long qryId; /** */ + private int segmentId; + + /** */ + private int originSegmentId; + + /** */ private int batchLookupId; /** */ @@ -130,6 +136,34 @@ public class GridH2IndexRangeResponse implements Message { } /** + * @param segmentId Index segment ID. + */ + public void segment(int segmentId) { + this.segmentId = segmentId; + } + + /** + * @return Index segment ID. + */ + public int segment() { + return segmentId; + } + + /** + * @return Origin index segment ID. + */ + public int originSegmentId() { + return originSegmentId; + } + + /** + * @param segmentId Origin index segment ID. + */ + public void originSegmentId(int segmentId) { + this.originSegmentId = segmentId; + } + + /** * @param batchLookupId Batch lookup ID. */ public void batchLookupId(int batchLookupId) { @@ -191,6 +225,17 @@ public class GridH2IndexRangeResponse implements Message { writer.incrementState(); + case 6: + if (!writer.writeInt("originSegId", originSegmentId)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeInt("segmentId", segmentId)) + return false; + + writer.incrementState(); } return true; @@ -252,6 +297,21 @@ public class GridH2IndexRangeResponse implements Message { reader.incrementState(); + case 6: + originSegmentId = reader.readInt("originSegId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + segmentId = reader.readInt("segmentId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); } return reader.afterMessageRead(GridH2IndexRangeResponse.class); @@ -264,7 +324,7 @@ public class GridH2IndexRangeResponse implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 6; + return 8; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java index 884173f..ec49aff 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java @@ -50,6 +50,11 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { */ public static int FLAG_DISTRIBUTED_JOINS = 1; + /** + * Restrict distributed joins range-requests to local index segments. Range requests to other nodes will not be sent. + */ + public static int FLAG_IS_LOCAL = 2; + /** */ private long reqId; http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java new file mode 100644 index 0000000..f8c9dd5 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.io.Serializable; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import javax.cache.Cache; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheKeyConfiguration; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests for correct distributed queries with index consisted of many segments. + */ +public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static int QRY_PARALLELISM_LVL = 97; + + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheKeyConfiguration keyCfg = new CacheKeyConfiguration("MyCache", "affKey"); + + cfg.setCacheKeyConfiguration(keyCfg); + + cfg.setPeerClassLoadingEnabled(false); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** + * @param name Cache name. + * @param partitioned Partition or replicated cache. + * @param idxTypes Indexed types. + * @return Cache configuration. + */ + private static <K, V> CacheConfiguration<K, V> cacheConfig(String name, boolean partitioned, Class<?>... idxTypes) { + return new CacheConfiguration<K, V>() + .setName(name) + .setCacheMode(partitioned ? CacheMode.PARTITIONED : CacheMode.REPLICATED) + .setQueryParallelism(partitioned ? QRY_PARALLELISM_LVL : 1) + .setAtomicityMode(CacheAtomicityMode.ATOMIC) + .setIndexedTypes(idxTypes); + } + + /** + * Run tests on single-node grid + * @throws Exception If failed. + */ + public void testSingleNodeIndexSegmentation() throws Exception { + startGridsMultiThreaded(1, true); + + ignite(0).createCache(cacheConfig("pers", true, Integer.class, Person.class)); + ignite(0).createCache(cacheConfig("org", true, Integer.class, Organization.class)); + + fillCache(); + + checkDistributedQueryWithSegmentedIndex(); + + checkLocalQueryWithSegmentedIndex(); + } + + /** + * Run tests on multi-node grid + * @throws Exception If failed. + */ + public void testMultiNodeIndexSegmentation() throws Exception { + startGridsMultiThreaded(4, true); + + ignite(0).createCache(cacheConfig("pers", true, Integer.class, Person.class)); + ignite(0).createCache(cacheConfig("org", true, Integer.class, Organization.class)); + + fillCache(); + + checkDistributedQueryWithSegmentedIndex(); + + checkLocalQueryWithSegmentedIndex(); + } + + /** + * Run tests on multi-node grid + * @throws Exception If failed. + */ + public void testMultiNodeSegmentedPartitionedWithReplicated() throws Exception { + startGridsMultiThreaded(4, true); + + ignite(0).createCache(cacheConfig("pers", true, Integer.class, Person.class)); + ignite(0).createCache(cacheConfig("org", false, Integer.class, Organization.class)); + + fillCache(); + + checkDistributedQueryWithSegmentedIndex(); + + checkLocalQueryWithSegmentedIndex(); + } + + /** + * Check distributed joins. + * @throws Exception If failed. + */ + public void checkDistributedQueryWithSegmentedIndex() throws Exception { + IgniteCache<Integer, Person> c1 = ignite(0).cache("pers"); + + int expectedPersons = 0; + + for (Cache.Entry<Integer, Person> e : c1) { + final Integer orgId = e.getValue().orgId; + + if (10 <= orgId && orgId < 500) + expectedPersons++; + } + + String select0 = "select o.name n1, p.name n2 from \"pers\".Person p, \"org\".Organization o where p.orgId = o._key"; + + List<List<?>> result = c1.query(new SqlFieldsQuery(select0).setDistributedJoins(true)).getAll(); + + assertEquals(expectedPersons, result.size()); + } + + /** + * Test local query. + * @throws Exception If failed. + */ + public void checkLocalQueryWithSegmentedIndex() throws Exception { + IgniteCache<Integer, Person> c1 = ignite(0).cache("pers"); + IgniteCache<Integer, Organization> c2 = ignite(0).cache("org"); + + Set<Integer> localOrgIds = new HashSet<>(); + + for(Cache.Entry<Integer, Organization> e : c2.localEntries()) + localOrgIds.add(e.getKey()); + + int expectedPersons = 0; + + for (Cache.Entry<Integer, Person> e : c1.localEntries()) { + final Integer orgId = e.getValue().orgId; + + if (localOrgIds.contains(orgId)) + expectedPersons++; + } + + String select0 = "select o.name n1, p.name n2 from \"pers\".Person p, \"org\".Organization o where p.orgId = o._key"; + + List<List<?>> result = c1.query(new SqlFieldsQuery(select0).setLocal(true)).getAll(); + + assertEquals(expectedPersons, result.size()); + } + + /** */ + private void fillCache() { + IgniteCache<Object, Object> c1 = ignite(0).cache("pers"); + + IgniteCache<Object, Object> c2 = ignite(0).cache("org"); + + final int orgCount = 500; + + for (int i = 0; i < orgCount; i++) + c2.put(i, new Organization("org-" + i)); + + final Random random = new Random(); + + for (int i = 0; i < 1000; i++) { + int orgID = 10 + random.nextInt(orgCount + 10); + + c1.put(i, new Person(orgID, "pers-" + i)); + } + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + @QuerySqlField(index = true) + Integer orgId; + + /** */ + @QuerySqlField + String name; + + /** + * + */ + public Person() { + // No-op. + } + + /** + * @param orgId Organization ID. + * @param name Name. + */ + public Person(int orgId, String name) { + this.orgId = orgId; + this.name = name; + } + } + + /** + * + */ + private static class Organization implements Serializable { + /** */ + @QuerySqlField + String name; + + /** + * + */ + public Organization() { + // No-op. + } + + /** + * @param name Organization name. + */ + public Organization(String name) { + this.name = name; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java index 06afe7c..4ae2f91 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java @@ -33,9 +33,10 @@ import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheKeyConfiguration; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; -import org.apache.ignite.cache.affinity.AffinityKeyMapped; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.affinity.AffinityKeyMapped; +import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cluster.ClusterNode; @@ -701,6 +702,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { ignite(0).destroyCache(cache.getName()); } } + /** * @throws Exception If failed. */ @@ -750,6 +752,127 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testIndexSegmentation() throws Exception { + CacheConfiguration ccfg1 = cacheConfig("pers", true, + Integer.class, Person2.class).setQueryParallelism(4); + CacheConfiguration ccfg2 = cacheConfig("org", true, + Integer.class, Organization.class).setQueryParallelism(4); + + IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1); + IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2); + + try { + c2.put(1, new Organization("o1")); + c2.put(2, new Organization("o2")); + c1.put(3, new Person2(1, "p1")); + c1.put(4, new Person2(2, "p2")); + c1.put(5, new Person2(3, "p3")); + + String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1"; + + checkQueryPlan(c1, true, 1, new SqlFieldsQuery(select0)); + + checkQueryPlan(c1, true, 1, new SqlFieldsQuery(select0).setLocal(true)); + } + finally { + c1.destroy(); + c2.destroy(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReplicationCacheIndexSegmentationFailure() throws Exception { + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + CacheConfiguration ccfg = cacheConfig("org", false, + Integer.class, Organization.class).setQueryParallelism(4); + + IgniteCache<Object, Object> c = ignite(0).createCache(ccfg); + + return null; + } + }, CacheException.class, "Cache index segmentation is supported for PARTITIONED mode only."); + } + + /** + * @throws Exception If failed. + */ + public void testIndexSegmentationPartitionedReplicated() throws Exception { + CacheConfiguration ccfg1 = cacheConfig("pers", true, + Integer.class, Person2.class).setQueryParallelism(4); + CacheConfiguration ccfg2 = cacheConfig("org", false, + Integer.class, Organization.class); + + final IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1); + final IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2); + + try { + c2.put(1, new Organization("o1")); + c2.put(2, new Organization("o2")); + c1.put(3, new Person2(1, "p1")); + c1.put(4, new Person2(2, "p2")); + c1.put(5, new Person2(3, "p3")); + + String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key"; + + final SqlFieldsQuery qry = new SqlFieldsQuery(select0); + + qry.setDistributedJoins(true); + + List<List<?>> results = c1.query(qry).getAll(); + + assertEquals(2, results.size()); + } + finally { + c1.destroy(); + c2.destroy(); + } + } + + /** + * @throws Exception If failed. + */ + public void testIndexWithDifferentSegmentationLevelsFailure() throws Exception { + CacheConfiguration ccfg1 = cacheConfig("pers", true, + Integer.class, Person2.class).setQueryParallelism(4); + CacheConfiguration ccfg2 = cacheConfig("org", true, + Integer.class, Organization.class).setQueryParallelism(3); + + final IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1); + final IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2); + + try { + c2.put(1, new Organization("o1")); + c2.put(2, new Organization("o2")); + c1.put(3, new Person2(1, "p1")); + c1.put(4, new Person2(2, "p2")); + c1.put(5, new Person2(3, "p3")); + + String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1"; + + final SqlFieldsQuery qry = new SqlFieldsQuery(select0); + + qry.setDistributedJoins(true); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + c1.query(qry); + + return null; + } + }, CacheException.class, "Using indexes with different parallelism levels in same query is forbidden."); + } + finally { + c1.destroy(); + c2.destroy(); + } + } + + /** * @param cache Cache. * @param sql SQL. * @param enforceJoinOrder Enforce join order flag. @@ -789,26 +912,26 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { false, 0, select + - "from " + cache1 + "," + cache2 + " "+ where); + "from " + cache1 + "," + cache2 + " " + where); checkQueryPlan(cache, false, 0, select + - "from " + cache2 + "," + cache1 + " "+ where); + "from " + cache2 + "," + cache1 + " " + where); if (testEnforceJoinOrder) { checkQueryPlan(cache, true, 0, select + - "from " + cache1 + "," + cache2 + " "+ where); + "from " + cache1 + "," + cache2 + " " + where); checkQueryPlan(cache, true, 0, select + - "from " + cache2 + "," + cache1 + " "+ where); + "from " + cache2 + "," + cache1 + " " + where); } } @@ -823,7 +946,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { boolean enforceJoinOrder, int expBatchedJoins, String sql, - String...expText) { + String... expText) { checkQueryPlan(cache, enforceJoinOrder, expBatchedJoins, @@ -848,7 +971,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { boolean enforceJoinOrder, int expBatchedJoins, SqlFieldsQuery qry, - String...expText) { + String... expText) { qry.setEnforceJoinOrder(enforceJoinOrder); qry.setDistributedJoins(true); @@ -984,7 +1107,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { * @param args Arguments. * @return Column as list. */ - private static <X> List<X> columnQuery(IgniteCache<?,?> c, String qry, Object... args) { + private static <X> List<X> columnQuery(IgniteCache<?, ?> c, String qry, Object... args) { return column(0, c.query(new SqlFieldsQuery(qry).setArgs(args)).getAll()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java index 52084c7..d6a5fb1 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java @@ -230,16 +230,16 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract assertEquals(0, spi.size(typeAB.space(), typeAB)); assertEquals(0, spi.size(typeBA.space(), typeBA)); - assertFalse(spi.queryLocalSql(typeAA.space(), "select * from A.A", null, Collections.emptySet(), typeAA, null).hasNext()); - assertFalse(spi.queryLocalSql(typeAB.space(), "select * from A.B", null, Collections.emptySet(), typeAB, null).hasNext()); - assertFalse(spi.queryLocalSql(typeBA.space(), "select * from B.A", null, Collections.emptySet(), typeBA, null).hasNext()); + assertFalse(spi.queryLocalSql(typeAA.space(), "select * from A.A", null, Collections.emptySet(), typeAA.name(), null, null).hasNext()); + assertFalse(spi.queryLocalSql(typeAB.space(), "select * from A.B", null, Collections.emptySet(), typeAB.name(), null, null).hasNext()); + assertFalse(spi.queryLocalSql(typeBA.space(), "select * from B.A", null, Collections.emptySet(), typeBA.name(), null, null).hasNext()); assertFalse(spi.queryLocalSql(typeBA.space(), "select * from B.A, A.B, A.A", null, - Collections.emptySet(), typeBA, null).hasNext()); + Collections.emptySet(), typeBA.name(), null, null).hasNext()); try { spi.queryLocalSql(typeBA.space(), "select aa.*, ab.*, ba.* from A.A aa, A.B ab, B.A ba", null, - Collections.emptySet(), typeBA, null).hasNext(); + Collections.emptySet(), typeBA.name(), null, null).hasNext(); fail("Enumerations of aliases in select block must be prohibited"); } @@ -248,10 +248,10 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract } assertFalse(spi.queryLocalSql(typeAB.space(), "select ab.* from A.B ab", null, - Collections.emptySet(), typeAB, null).hasNext()); + Collections.emptySet(), typeAB.name(), null, null).hasNext()); assertFalse(spi.queryLocalSql(typeBA.space(), "select ba.* from B.A as ba", null, - Collections.emptySet(), typeBA, null).hasNext()); + Collections.emptySet(), typeBA.name(), null, null).hasNext()); // Nothing to remove. spi.remove("A", key(1), aa(1, "", 10)); @@ -305,7 +305,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract // Query data. Iterator<IgniteBiTuple<Integer, Map<String, Object>>> res = - spi.queryLocalSql(typeAA.space(), "from a order by age", null, Collections.emptySet(), typeAA, null); + spi.queryLocalSql(typeAA.space(), "from a order by age", null, Collections.emptySet(), typeAA.name(), null, null); assertTrue(res.hasNext()); assertEquals(aa(3, "Borya", 18).value(null, false), value(res.next())); @@ -314,7 +314,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract assertFalse(res.hasNext()); res = spi.queryLocalSql(typeAA.space(), "select aa.* from a aa order by aa.age", null, - Collections.emptySet(), typeAA, null); + Collections.emptySet(), typeAA.name(), null, null); assertTrue(res.hasNext()); assertEquals(aa(3, "Borya", 18).value(null, false), value(res.next())); @@ -322,7 +322,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract assertEquals(aa(2, "Valera", 19).value(null, false), value(res.next())); assertFalse(res.hasNext()); - res = spi.queryLocalSql(typeAB.space(), "from b order by name", null, Collections.emptySet(), typeAB, null); + res = spi.queryLocalSql(typeAB.space(), "from b order by name", null, Collections.emptySet(), typeAB.name(), null, null); assertTrue(res.hasNext()); assertEquals(ab(1, "Vasya", 20, "Some text about Vasya goes here.").value(null, false), value(res.next())); @@ -331,7 +331,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract assertFalse(res.hasNext()); res = spi.queryLocalSql(typeAB.space(), "select bb.* from b as bb order by bb.name", null, - Collections.emptySet(), typeAB, null); + Collections.emptySet(), typeAB.name(), null, null); assertTrue(res.hasNext()); assertEquals(ab(1, "Vasya", 20, "Some text about Vasya goes here.").value(null, false), value(res.next())); @@ -340,7 +340,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract assertFalse(res.hasNext()); - res = spi.queryLocalSql(typeBA.space(), "from a", null, Collections.emptySet(), typeBA, null); + res = spi.queryLocalSql(typeBA.space(), "from a", null, Collections.emptySet(), typeBA.name(), null, null); assertTrue(res.hasNext()); assertEquals(ba(2, "Kolya", 25, true).value(null, false), value(res.next())); @@ -725,4 +725,4 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract throw new UnsupportedOperationException(); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java b/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java new file mode 100644 index 0000000..d1d80f2 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.h2indexing; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; + +/** + * SQL query stress test. + */ +public class FetchingQueryCursorStressTest { + /** Node count. */ + private static final int NODE_CNT = 4; // Switch to 4 to see better throughput. + + /** Number of entries. */ + private static final int ENTRIES_CNT = 10_000; + + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** Thread count. */ + private static final int THREAD_CNT = 16; + + /** Execution counter. */ + private static final AtomicLong CNT = new AtomicLong(); + + /** Verbose mode. */ + private static final boolean VERBOSE = false; + + /** */ + private static final long TIMEOUT = TimeUnit.SECONDS.toMillis(30); + + public static final AtomicReference<Exception> error = new AtomicReference<>(); + + /** + * Entry point. + */ + public static void main(String[] args) throws Exception { + List<Thread> threads = new ArrayList<>(THREAD_CNT + 1); + + try (Ignite ignite = start()) { + + IgniteCache<Integer, Person> cache = ignite.cache(CACHE_NAME); + + loadData(ignite, cache); + + System.out.println("Loaded data: " + cache.size()); + + for (int i = 0; i < THREAD_CNT; i++) + threads.add(startDaemon("qry-exec-" + i, new QueryExecutor(cache, "Select * from Person"))); + + threads.add(startDaemon("printer", new ThroughputPrinter())); + + Thread.sleep(TIMEOUT); + + for (Thread t : threads) + t.join(); + + if(error.get()!=null) + throw error.get(); + } + finally { + Ignition.stopAll(false); + } + } + + /** + * Start daemon thread. + * + * @param name Name. + * @param r Runnable. + */ + private static Thread startDaemon(String name, Runnable r) { + Thread t = new Thread(r); + + t.setName(name); + t.setDaemon(true); + + t.start(); + + return t; + } + + /** + * Load data into Ignite. + * + * @param ignite Ignite. + * @param cache Cache. + */ + private static void loadData(Ignite ignite, IgniteCache<Integer, Person> cache) throws Exception { + try (IgniteDataStreamer<Object, Object> str = ignite.dataStreamer(cache.getName())) { + + for (int id = 0; id < ENTRIES_CNT; id++) + str.addData(id, new Person(id, "John" + id, "Doe")); + } + } + + /** + * Start topology. + * + * @return Client node. + */ + private static Ignite start() { + int i = 0; + + for (; i < NODE_CNT; i++) + Ignition.start(config(i, false)); + + return Ignition.start(config(i, true)); + } + + /** + * Create configuration. + * + * @param idx Index. + * @param client Client flag. + * @return Configuration. + */ + @SuppressWarnings("unchecked") + private static IgniteConfiguration config(int idx, boolean client) { + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName("grid-" + idx); + cfg.setClientMode(client); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(CACHE_NAME); + ccfg.setIndexedTypes(Integer.class, Person.class); + cfg.setMarshaller(new OptimizedMarshaller()); + + cfg.setCacheConfiguration(ccfg); + + cfg.setLocalHost("127.0.0.1"); + + return cfg; + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @QuerySqlField + private int id; + + /** */ + @QuerySqlField + private String firstName; + + /** */ + @QuerySqlField + private String lastName; + + public Person(int id, String firstName, String lastName) { + this.id = id; + this.firstName = firstName; + this.lastName = lastName; + } + } + + /** + * Query runner. + */ + private static class QueryExecutor implements Runnable { + /** Cache. */ + private final IgniteCache<Integer, Person> cache; + + /** */ + private final String query; + + /** + * Constructor. + * + * @param cache Cache. + */ + public QueryExecutor(IgniteCache<Integer, Person> cache, String query) { + this.cache = cache; + this.query = query; + } + + /** {@inheritDoc} */ + @SuppressWarnings("InfiniteLoopStatement") + @Override public void run() { + System.out.println("Executor started: " + Thread.currentThread().getName()); + + try { + while (error.get()==null && !Thread.currentThread().isInterrupted()) { + long start = System.nanoTime(); + + SqlFieldsQuery qry = new SqlFieldsQuery(query); + +// qry.setArgs((Object[]) argumentForQuery()); + + Set<Integer> extIds = new HashSet<>(); + + for (List<?> next : cache.query(qry)) + extIds.add((Integer)next.get(0)); + + long dur = (System.nanoTime() - start) / 1_000_000; + + CNT.incrementAndGet(); + + if (VERBOSE) + System.out.println("[extIds=" + extIds.size() + ", dur=" + dur + ']'); + } + } + catch (CacheException ex){ + error.compareAndSet(null, ex); + } + } + } + + /** + * Throughput printer. + */ + private static class ThroughputPrinter implements Runnable { + /** {@inheritDoc} */ + @SuppressWarnings("InfiniteLoopStatement") + @Override public void run() { + while (error.get()==null) { + long before = CNT.get(); + long beforeTime = System.currentTimeMillis(); + + try { + Thread.sleep(2000L); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + return; + } + + long after = CNT.get(); + long afterTime = System.currentTimeMillis(); + + double res = 1000 * ((double)(after - before)) / (afterTime - beforeTime); + + System.out.println((long)res + " ops/sec"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 386b8fd..b417b0a 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -98,6 +98,7 @@ import org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTes import org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest; import org.apache.ignite.internal.processors.query.IgniteSqlEntryCacheModeAgnosticTest; import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest; +import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexSelfTest; import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest; import org.apache.ignite.internal.processors.query.h2.GridH2IndexRebuildTest; import org.apache.ignite.internal.processors.query.h2.GridH2IndexingInMemSelfTest; @@ -134,6 +135,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { // Queries tests. suite.addTestSuite(IgniteSqlSplitterSelfTest.class); + suite.addTestSuite(IgniteSqlSegmentedIndexSelfTest.class); suite.addTestSuite(IgniteSqlSchemaIndexingTest.class); suite.addTestSuite(GridCacheQueryIndexDisabledSelfTest.class); suite.addTestSuite(IgniteCacheQueryLoadSelfTest.class);
