http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java index b220291..4889069 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java @@ -144,11 +144,6 @@ public class GridQueryNextPageResponse implements Message { } /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridQueryNextPageResponse.class, this); - } - - /** {@inheritDoc} */ @Override public void onAckReceived() { // No-op. } @@ -304,4 +299,11 @@ public class GridQueryNextPageResponse implements Message { public void retry(AffinityTopologyVersion retry) { this.retry = retry; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridQueryNextPageResponse.class, this, + "valsSize", vals != null ? vals.size() : 0, + "rowsSize", plainRows != null ? plainRows.size() : 0); + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java index 60d348b..f7de86c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java @@ -20,13 +20,16 @@ package org.apache.ignite.internal.processors.query.h2.twostep.messages; import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteCodeGeneratingFail; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -35,8 +38,9 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * Query request. */ +@Deprecated @IgniteCodeGeneratingFail -public class GridQueryRequest implements Message { +public class GridQueryRequest implements Message, GridCacheQueryMarshallable { /** */ private static final long serialVersionUID = 0L; @@ -165,11 +169,29 @@ public class GridQueryRequest implements Message { /** * @return Queries. */ - public Collection<GridCacheSqlQuery> queries() throws IgniteCheckedException { + public Collection<GridCacheSqlQuery> queries() { return qrys; } /** {@inheritDoc} */ + @Override public void marshall(Marshaller m) { + if (F.isEmpty(qrys)) + return; + + for (GridCacheSqlQuery qry : qrys) + qry.marshall(m); + } + + /** {@inheritDoc} */ + @Override public void unmarshall(Marshaller m, GridKernalContext ctx) { + if (F.isEmpty(qrys)) + return; + + for (GridCacheSqlQuery qry : qrys) + qry.unmarshall(m, ctx); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridQueryRequest.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 269795b..7a81cf8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -6703,6 +6703,19 @@ public abstract class IgniteUtils { } /** + * Swaps two objects in array. + * + * @param arr Array. + * @param a Index of the first object. + * @param b Index of the second object. + */ + public static void swap(Object[] arr, int a, int b) { + Object tmp = arr[a]; + arr[a] = arr[b]; + arr[b] = tmp; + } + + /** * Returns array which is the union of two arrays * (array of elements contained in any of provided arrays). * <p/> http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index ab31625..5eb27d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -2313,6 +2313,26 @@ public class GridFunc { } /** + * Converts collection of numbers to primitive {@code int[]} array. + * + * @param col Collection of numbers. + * @return Array of integers. + */ + public static int[] toIntArray(Collection<? extends Number> col) { + if (col == null) + return null; + + int[] res = new int[col.size()]; + + Iterator<? extends Number> iter = col.iterator(); + + for (int i = 0; i < res.length; i++) + res[i] = iter.next().intValue(); + + return res; + } + + /** * Utility map getter. This method analogous to {@link #addIfAbsent(Map, Object, Callable)} * method but this one doesn't put the default value into the map when key is not found. * http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffHeapSnapTreeMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffHeapSnapTreeMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffHeapSnapTreeMap.java index 04ea56c..7bde651 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffHeapSnapTreeMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridOffHeapSnapTreeMap.java @@ -35,7 +35,6 @@ package org.apache.ignite.internal.util.offheap.unsafe; -import java.io.Closeable; import java.util.AbstractMap; import java.util.AbstractSet; import java.util.ArrayList; @@ -51,10 +50,11 @@ import java.util.SortedSet; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.util.typedef.internal.SB; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -107,7 +107,7 @@ import org.jsr166.ConcurrentHashMap8; */ @SuppressWarnings("ALL") public class GridOffHeapSnapTreeMap<K extends GridOffHeapSmartPointer,V extends GridOffHeapSmartPointer> - extends AbstractMap<K,V> implements ConcurrentNavigableMap<K, V>, Cloneable, Closeable { + extends AbstractMap<K,V> implements ConcurrentNavigableMap<K, V>, Cloneable, AutoCloseable, GridReservable { /** This is a special value that indicates that an optimistic read failed. */ private static final GridOffHeapSmartPointer SpecialRetry = new GridOffHeapSmartPointer() { @Override public long pointer() { @@ -937,17 +937,17 @@ public class GridOffHeapSnapTreeMap<K extends GridOffHeapSmartPointer,V extends protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); /** */ - private final AtomicBoolean stopped = new AtomicBoolean(false); + private boolean stopped; /** {@inheritDoc} */ @Override public boolean add(long node) { - ReentrantReadWriteLock.ReadLock l = lock.readLock(); + Lock l = lock.readLock(); if (!l.tryLock()) return false; try { - return super.add(node); + return !stopped && super.add(node); } finally { l.unlock(); @@ -956,13 +956,13 @@ public class GridOffHeapSnapTreeMap<K extends GridOffHeapSmartPointer,V extends /** {@inheritDoc} */ @Override public boolean add(RecycleQueue que) { - ReentrantReadWriteLock.ReadLock l = lock.readLock(); + Lock l = lock.readLock(); if (!l.tryLock()) return false; try { - return super.add(que); + return !stopped && super.add(que); } finally { l.unlock(); @@ -980,20 +980,21 @@ public class GridOffHeapSnapTreeMap<K extends GridOffHeapSmartPointer,V extends * @return {@code true} If we stopped this queue. */ public boolean stop() { - if (stopped.compareAndSet(false, true)) { - lock.writeLock().lock(); + Lock l = lock.writeLock(); - return true; - } + l.lock(); - return false; - } + try { + if (stopped) + return false; - /** - * @return {@code true} If this queue is stopped.. - */ - public boolean isStopped() { - return stopped.get(); + stopped = true; + + return true; + } + finally { + l.unlock(); + } } } @@ -1046,6 +1047,12 @@ public class GridOffHeapSnapTreeMap<K extends GridOffHeapSmartPointer,V extends /** Recycle queue for this snapshot. */ private volatile StoppableRecycleQueue recycleBin; + /** */ + private AtomicInteger reservations; + + /** */ + private volatile boolean closing; + /** * @param keyFactory Key factory. * @param valFactory Value factory. @@ -1079,10 +1086,53 @@ public class GridOffHeapSnapTreeMap<K extends GridOffHeapSmartPointer,V extends this.holderRef = rootHolder(); } + /** {@inheritDoc} */ + @Override public boolean reserve() { + for (;;) { + int r = reservations.get(); + + if (r == -1) + return false; + + assert r >= 0 : r; + + if (reservations.compareAndSet(r, r + 1)) + return true; + } + } + + /** {@inheritDoc} */ + @Override public void release() { + for (;;) { + int r = reservations.get(); + + assert r > 0 : r; + + int z = r == 1 && closing ? -1 : r - 1; + + if (reservations.compareAndSet(r, z)) { + if (z == -1) + doClose(); + + return; + } + } + } + /** * Closes tree map and reclaims memory. */ - public void close() { + @Override public void close() { + closing = true; + + if (reservations == null || reservations.compareAndSet(0, -1)) + doClose(); + } + + /** + * + */ + private void doClose() { RecycleQueue q; if (snapshotId.longValue() == 0) { @@ -1142,6 +1192,7 @@ public class GridOffHeapSnapTreeMap<K extends GridOffHeapSmartPointer,V extends copy.holderRef = rootHolder(holderRef); markShared(root()); + copy.reservations = new AtomicInteger(); copy.size = new AtomicInteger(size()); copy.recycleBin = new StoppableRecycleQueue(); http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java index 9392cc1..37352e1 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java @@ -51,7 +51,7 @@ public class TcpDiscoveryJdbcIpFinderSelfTest extends dataSrc.setDriverClass("org.h2.Driver"); if (initSchema) - dataSrc.setJdbcUrl("jdbc:h2:mem"); + dataSrc.setJdbcUrl("jdbc:h2:mem:./test"); else { dataSrc.setJdbcUrl("jdbc:h2:mem:jdbc_ipfinder_not_initialized_schema"); http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 87509a4..e475754 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.cache.Cache; import javax.cache.CacheException; @@ -47,6 +48,7 @@ import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; @@ -529,7 +531,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { ", locNode=" + g.cluster().localNode() + ']'); } - Thread.sleep(200); // Busy wait. + Thread.sleep(20); // Busy wait. continue; } @@ -1177,4 +1179,39 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { } } } + + /** + * @param aff Affinity. + * @param key Counter. + * @param node Target node. + * @return Key. + */ + protected final Integer keyForNode(Affinity<Object> aff, AtomicInteger key, ClusterNode node) { + for (int i = 0; i < 100_000; i++) { + Integer next = key.getAndIncrement(); + + if (aff.mapKeyToNode(next).equals(node)) + return next; + } + + fail("Failed to find key for node: " + node); + + return null; + } + + /** + * @param cache Cache. + * @param qry Query. + * @return Query plan. + */ + protected final String queryPlan(IgniteCache<?, ?> cache, SqlFieldsQuery qry) { + return (String)cache.query(new SqlFieldsQuery("explain " + qry.getSql()) + .setArgs(qry.getArgs()) + .setLocal(qry.isLocal()) + .setCollocated(qry.isCollocated()) + .setPageSize(qry.getPageSize()) + .setDistributedJoins(qry.isDistributedJoins()) + .setEnforceJoinOrder(qry.isEnforceJoinOrder())) + .getAll().get(0).get(0); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java ---------------------------------------------------------------------- diff --git a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java index 29b7dd4..3062d13 100644 --- a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java +++ b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java @@ -28,29 +28,31 @@ import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.h2.engine.Constants; import org.h2.engine.Session; import org.h2.index.Cursor; -import org.h2.index.IndexCondition; import org.h2.index.IndexType; import org.h2.index.SingleRowCursor; import org.h2.index.SpatialIndex; +import org.h2.index.SpatialTreeIndex; import org.h2.message.DbException; import org.h2.mvstore.MVStore; import org.h2.mvstore.rtree.MVRTreeMap; import org.h2.mvstore.rtree.SpatialKey; import org.h2.result.SearchRow; import org.h2.result.SortOrder; -import org.h2.table.Column; import org.h2.table.IndexColumn; import org.h2.table.Table; import org.h2.table.TableFilter; import org.h2.value.Value; import org.h2.value.ValueGeometry; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL; /** * Spatial index. */ +@SuppressWarnings("unused"/*reflection*/) public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex { /** */ private final ReadWriteLock lock = new ReentrantReadWriteLock(); @@ -80,12 +82,8 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex * @param tbl Table. * @param idxName Index name. * @param cols Columns. - * @param keyCol Key column. - * @param valCol Value column. */ - public GridH2SpatialIndex(Table tbl, String idxName, IndexColumn[] cols, int keyCol, int valCol) { - super(keyCol, valCol); - + public GridH2SpatialIndex(Table tbl, String idxName, IndexColumn... cols) { if (cols.length > 1) throw DbException.getUnsupportedException("can only do one column"); @@ -121,7 +119,14 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex } /** {@inheritDoc} */ + @Nullable @Override protected Object doTakeSnapshot() { + return null; // TODO We do not support snapshots, but probably this is possible. + } + + /** {@inheritDoc} */ @Override public GridH2Row put(GridH2Row row) { + assert row instanceof GridH2AbstractKeyValueRow : "requires key to be at 0"; + Lock l = lock.writeLock(); l.lock(); @@ -129,7 +134,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex try { checkClosed(); - Value key = row.getValue(keyCol); + Value key = row.getValue(KEY_COL); assert key != null; @@ -183,7 +188,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex try { checkClosed(); - Value key = row.getValue(keyCol); + Value key = row.getValue(KEY_COL); assert key != null; @@ -208,7 +213,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex } /** {@inheritDoc} */ - @Override public void close(Session ses) { + @Override public void destroy() { Lock l = lock.writeLock(); l.lock(); @@ -221,37 +226,31 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex finally { l.unlock(); } + + super.destroy(); } /** {@inheritDoc} */ - @Override protected long getCostRangeIndex(int[] masks, long rowCnt, TableFilter filter, SortOrder sortOrder) { - rowCnt += Constants.COST_ROW_OFFSET; - long cost = rowCnt; - long rows = rowCnt; - - if (masks == null) - return cost; - - for (Column column : columns) { - int idx = column.getColumnId(); - int mask = masks[idx]; - if ((mask & IndexCondition.SPATIAL_INTERSECTS) != 0) { - cost = 3 + rows / 4; - - break; - } - } - - return cost; + @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder) { + return SpatialTreeIndex.getCostRangeIndex(masks, + table.getRowCountApproximation(), columns) / 10; } /** {@inheritDoc} */ - @Override public double getCost(Session ses, int[] masks, TableFilter filter, SortOrder sortOrder) { - return getCostRangeIndex(masks, rowCnt, filter, sortOrder); + @Override public Cursor find(TableFilter filter, SearchRow first, SearchRow last) { + return find0(filter); } /** {@inheritDoc} */ @Override public Cursor find(Session ses, SearchRow first, SearchRow last) { + return find0(null); + } + + /** + * @param filter Table filter. + * @return Cursor. + */ + private Cursor find0(TableFilter filter) { Lock l = lock.readLock(); l.lock(); @@ -259,7 +258,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex try { checkClosed(); - return new GridH2Cursor(rowIterator(treeMap.keySet().iterator())); + return new GridH2Cursor(rowIterator(treeMap.keySet().iterator(), filter)); } finally { l.unlock(); @@ -273,9 +272,10 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex /** * @param i Spatial key iterator. + * @param filter Table filter. * @return Iterator over rows. */ - private Iterator<GridH2Row> rowIterator(Iterator<SpatialKey> i) { + private Iterator<GridH2Row> rowIterator(Iterator<SpatialKey> i, TableFilter filter) { if (!i.hasNext()) return Collections.emptyIterator(); @@ -290,7 +290,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex } while (i.hasNext()); - return filter(rows.iterator()); + return filter(rows.iterator(), threadLocalFilter()); } /** {@inheritDoc} */ @@ -305,7 +305,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex if (!first) throw DbException.throwInternalError("Spatial Index can only be fetch by ascending order"); - Iterator<GridH2Row> iter = rowIterator(treeMap.keySet().iterator()); + Iterator<GridH2Row> iter = rowIterator(treeMap.keySet().iterator(), null); return new SingleRowCursor(iter.hasNext() ? iter.next() : null); } @@ -334,7 +334,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex if (intersection == null) return find(filter.getSession(), null, null); - return new GridH2Cursor(rowIterator(treeMap.findIntersectingKeys(getEnvelope(intersection, 0)))); + return new GridH2Cursor(rowIterator(treeMap.findIntersectingKeys(getEnvelope(intersection, 0)), filter)); } finally { l.unlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 0b0fe85..535881e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -64,6 +64,7 @@ import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; @@ -74,6 +75,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; @@ -82,14 +84,15 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor; import org.apache.ignite.internal.processors.query.GridQueryIndexing; import org.apache.ignite.internal.processors.query.GridQueryProperty; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; -import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine; import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap; import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowFactory; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.processors.query.h2.opt.GridH2TreeIndex; -import org.apache.ignite.internal.processors.query.h2.opt.GridH2Utils; import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject; import org.apache.ignite.internal.processors.query.h2.opt.GridLuceneIndex; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter; @@ -100,37 +103,44 @@ import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.GridEmptyCloseableIterator; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T3; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiClosure; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.resources.LoggerResource; -import org.apache.ignite.spi.IgniteSpiCloseableIterator; import org.apache.ignite.spi.indexing.IndexingQueryFilter; +import org.h2.api.ErrorCode; import org.h2.api.JavaObjectSerializer; import org.h2.command.CommandInterface; -import org.h2.constant.ErrorCode; -import org.h2.constant.SysProperties; +import org.h2.engine.Session; +import org.h2.engine.SysProperties; import org.h2.index.Index; import org.h2.index.SpatialIndex; +import org.h2.jdbc.JdbcConnection; import org.h2.jdbc.JdbcPreparedStatement; import org.h2.message.DbException; import org.h2.mvstore.cache.CacheLongKeyLIRS; +import org.h2.result.SortOrder; import org.h2.server.web.WebServer; import org.h2.table.Column; import org.h2.table.IndexColumn; import org.h2.table.Table; import org.h2.tools.Server; -import org.h2.util.Utils; +import org.h2.util.JdbcUtils; import org.h2.value.DataType; import org.h2.value.Value; import org.h2.value.ValueArray; @@ -149,6 +159,7 @@ import org.h2.value.ValueNull; import org.h2.value.ValueShort; import org.h2.value.ValueString; import org.h2.value.ValueTime; +import org.h2.value.ValueTimestamp; import org.h2.value.ValueUuid; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -161,9 +172,8 @@ import static org.apache.ignite.internal.processors.query.GridQueryIndexType.FUL import static org.apache.ignite.internal.processors.query.GridQueryIndexType.GEO_SPATIAL; import static org.apache.ignite.internal.processors.query.GridQueryIndexType.SORTED; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL; -import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL; -import static org.h2.result.SortOrder.ASCENDING; -import static org.h2.result.SortOrder.DESCENDING; +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL; +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE; /** * Indexing implementation based on H2 database engine. In this implementation main query language is SQL, @@ -181,8 +191,10 @@ import static org.h2.result.SortOrder.DESCENDING; public class IgniteH2Indexing implements GridQueryIndexing { /** Default DB options. */ private static final String DB_OPTIONS = ";LOCK_MODE=3;MULTI_THREADED=1;DB_CLOSE_ON_EXIT=FALSE" + - ";DEFAULT_LOCK_TIMEOUT=10000;FUNCTIONS_IN_SCHEMA=true;OPTIMIZE_REUSE_RESULTS=0;QUERY_CACHE_SIZE=0;" + - "RECOMPILE_ALWAYS=1;MAX_OPERATION_MEMORY=0"; + ";DEFAULT_LOCK_TIMEOUT=10000;FUNCTIONS_IN_SCHEMA=true;OPTIMIZE_REUSE_RESULTS=0;QUERY_CACHE_SIZE=0" + + ";RECOMPILE_ALWAYS=1;MAX_OPERATION_MEMORY=0;NESTED_JOINS=0;BATCH_JOINS=1" + + ";ROW_FACTORY=\"" + GridH2RowFactory.class.getName() + "\"" + + ";DEFAULT_TABLE_ENGINE=" + GridH2DefaultTableEngine.class.getName(); /** */ private static final int PREPARED_STMT_CACHE_SIZE = 256; @@ -219,9 +231,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { * Command in H2 prepared statement. */ static { - try { - System.setProperty("h2.objectCache", "false"); + // Initialize system properties for H2. + System.setProperty("h2.objectCache", "false"); + System.setProperty("h2.serializeJavaObject", "false"); + System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid ValueJavaObject caching. + try { COMMAND_FIELD = JdbcPreparedStatement.class.getDeclaredField("command"); COMMAND_FIELD.setAccessible(true); @@ -260,6 +275,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { private final Map<String, String> space2schema = new ConcurrentHashMap8<>(); /** */ + private GridSpinBusyLock busyLock; + + /** */ private final ThreadLocal<ConnectionWrapper> connCache = new ThreadLocal<ConnectionWrapper>() { @Nullable @Override public ConnectionWrapper get() { ConnectionWrapper c = super.get(); @@ -304,13 +322,35 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** */ private volatile GridKernalContext ctx; + /** */ + private final ConcurrentMap<String, GridH2Table> dataTables = new ConcurrentHashMap8<>(); + /** Statement cache. */ private final ConcurrentHashMap<Thread, StatementCache> stmtCache = new ConcurrentHashMap<>(); /** */ - private final GridBoundedConcurrentLinkedHashMap<T3<String, String, Boolean>, TwoStepCachedQuery> twoStepCache = + private final GridBoundedConcurrentLinkedHashMap<TwoStepCachedQueryKey, TwoStepCachedQuery> twoStepCache = new GridBoundedConcurrentLinkedHashMap<>(TWO_STEP_QRY_CACHE_SIZE); + /** */ + private final IgniteInClosure<? super IgniteInternalFuture<?>> logger = new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + try { + fut.get(); + } + catch (IgniteCheckedException e) { + U.error(log, e.getMessage(), e); + } + } + }; + + /** + * @return Kernal context. + */ + public GridKernalContext kernalContext() { + return ctx; + } + /** * @param space Space. * @return Connection. @@ -695,8 +735,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { U.close(stmt, log); } - tbl.tbl.close(); - if (tbl.luceneIdx != null) U.closeQuiet(tbl.luceneIdx); @@ -705,7 +743,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryText( + @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText( @Nullable String spaceName, String qry, GridQueryTypeDescriptor type, IndexingQueryFilter filters) throws IgniteCheckedException { TableDescriptor tbl = tableDescriptor(spaceName, type); @@ -727,14 +765,14 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public GridQueryFieldsResult queryFields(@Nullable final String spaceName, final String qry, - @Nullable final Collection<Object> params, final IndexingQueryFilter filters) + @Override public GridQueryFieldsResult queryLocalSqlFields(@Nullable final String spaceName, final String qry, + @Nullable final Collection<Object> params, final IndexingQueryFilter filters, boolean enforceJoinOrder) throws IgniteCheckedException { - setFilters(filters); + Connection conn = connectionForSpace(spaceName); - try { - Connection conn = connectionForThread(schema(spaceName)); + initLocalQueryContext(conn, enforceJoinOrder, filters); + try { ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, qry, params, true); List<GridQueryFieldMetadata> meta = null; @@ -751,7 +789,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { return new GridQueryFieldsResultAdapter(meta, new FieldsIterator(rs)); } finally { - setFilters(null); + GridH2QueryContext.clearThreadLocal(); } } @@ -897,25 +935,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** - * Executes query. - * - * @param space Space. - * @param qry Query. - * @param params Query parameters. - * @param tbl Target table of query to generate select. - * @return Result set. - * @throws IgniteCheckedException If failed. - */ - private ResultSet executeQuery(String space, String qry, @Nullable Collection<Object> params, TableDescriptor tbl) - throws IgniteCheckedException { - Connection conn = connectionForThread(tbl.schemaName()); - - String sql = generateQuery(qry, tbl); - - return executeSqlQueryWithTimer(space, conn, sql, params, true); - } - - /** * Binds parameters to prepared statement. * * @param stmt Prepared statement. @@ -932,43 +951,66 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** - * Executes regular query. - * - * @param spaceName Space name. - * @param qry Query. - * @param params Query parameters. - * @param type Query return type. - * @param filters Space name and key filters. - * @return Queried rows. - * @throws IgniteCheckedException If failed. + * @param conn Connection. + * @param enforceJoinOrder Enforce join order of tables. + * @param filter Filter. */ + private void initLocalQueryContext(Connection conn, boolean enforceJoinOrder, IndexingQueryFilter filter) { + setupConnection(conn, false, enforceJoinOrder); + + GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter).distributedJoins(false)); + } + + /** + * @param conn Connection to use. + * @param distributedJoins If distributed joins are enabled. + * @param enforceJoinOrder Enforce join order of tables. + */ + public void setupConnection(Connection conn, boolean distributedJoins, boolean enforceJoinOrder) { + Session s = session(conn); + + s.setForceJoinOrder(enforceJoinOrder); + s.setJoinBatchEnabled(distributedJoins); + } + + /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(@Nullable String spaceName, + @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName, final String qry, @Nullable final Collection<Object> params, GridQueryTypeDescriptor type, - final IndexingQueryFilter filters) throws IgniteCheckedException { + final IndexingQueryFilter filter) throws IgniteCheckedException { final TableDescriptor tbl = tableDescriptor(spaceName, type); if (tbl == null) throw new CacheException("Failed to find SQL table for type: " + type.name()); - setFilters(filters); + String sql = generateQuery(qry, tbl); + + Connection conn = connectionForThread(tbl.schemaName()); + + initLocalQueryContext(conn, false, filter); try { - ResultSet rs = executeQuery(spaceName, qry, params, tbl); + ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true); return new KeyValIterator(rs); } finally { - setFilters(null); + GridH2QueryContext.clearThreadLocal(); } } - /** {@inheritDoc} */ - @Override public Iterable<List<?>> queryTwoStep(final GridCacheContext<?,?> cctx, final GridCacheTwoStepQuery qry, - final boolean keepCacheObj) { + /** + * @param cctx Cache context. + * @param qry Query. + * @param keepCacheObj Flag to keep cache object. + * @param enforceJoinOrder Enforce join order of tables. + * @return Iterable result. + */ + private Iterable<List<?>> runQueryTwoStep(final GridCacheContext<?,?> cctx, final GridCacheTwoStepQuery qry, + final boolean keepCacheObj, final boolean enforceJoinOrder) { return new Iterable<List<?>>() { @Override public Iterator<List<?>> iterator() { - return rdcQryExec.query(cctx, qry, keepCacheObj); + return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder); } }; } @@ -997,6 +1039,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { fqry.setArgs(qry.getArgs()); fqry.setPageSize(qry.getPageSize()); + fqry.setDistributedJoins(qry.isDistributedJoins()); final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry); @@ -1030,6 +1073,14 @@ public class IgniteH2Indexing implements GridQueryIndexing { }; } + /** + * @param c Connection. + * @return Session. + */ + public static Session session(Connection c) { + return (Session)((JdbcConnection)c).getSession(); + } + /** {@inheritDoc} */ @Override public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, SqlFieldsQuery qry) { final String space = cctx.name(); @@ -1037,10 +1088,15 @@ public class IgniteH2Indexing implements GridQueryIndexing { Connection c = connectionForSpace(space); + final boolean enforceJoinOrder = qry.isEnforceJoinOrder(); + final boolean distributedJoins = qry.isDistributedJoins() && cctx.isPartitioned(); + final boolean grpByCollocated = qry.isCollocated(); + GridCacheTwoStepQuery twoStepQry; List<GridQueryFieldMetadata> meta; - final T3<String, String, Boolean> cachedQryKey = new T3<>(space, sqlQry, qry.isCollocated()); + final TwoStepCachedQueryKey cachedQryKey = new TwoStepCachedQueryKey(space, sqlQry, grpByCollocated, + distributedJoins, enforceJoinOrder); TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey); if (cachedQry != null) { @@ -1048,51 +1104,95 @@ public class IgniteH2Indexing implements GridQueryIndexing { meta = cachedQry.meta; } else { + final UUID locNodeId = ctx.localNodeId(); + + setupConnection(c, distributedJoins, enforceJoinOrder); + + GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, 0, PREPARE) + .distributedJoins(distributedJoins)); + PreparedStatement stmt; boolean cachesCreated = false; - while (true) { - try { - // Do not cache this statement because the whole two step query object will be cached later on. - stmt = prepareStatement(c, sqlQry, false); + try { + while (true) { + try { + // Do not cache this statement because the whole two step query object will be cached later on. + stmt = prepareStatement(c, sqlQry, false); - break; - } - catch (SQLException e) { - if (!cachesCreated && e.getErrorCode() == ErrorCode.SCHEMA_NOT_FOUND_1) { - try { - ctx.cache().createMissingCaches(); - } - catch (IgniteCheckedException e1) { - throw new CacheException("Failed to create missing caches.", e); - } + break; + } + catch (SQLException e) { + if (!cachesCreated && e.getErrorCode() == ErrorCode.SCHEMA_NOT_FOUND_1) { + try { + ctx.cache().createMissingCaches(); + } + catch (IgniteCheckedException e1) { + throw new CacheException("Failed to create missing caches.", e); + } - cachesCreated = true; + cachesCreated = true; + } + else + throw new CacheException("Failed to parse query: " + sqlQry, e); } - else - throw new CacheException("Failed to parse query: " + sqlQry, e); } } + finally { + GridH2QueryContext.clearThreadLocal(); + } try { - try { - bindParameters(stmt, F.asList(qry.getArgs())); - } - catch (IgniteCheckedException e) { - throw new CacheException("Failed to bind parameters: [qry=" + sqlQry + ", params=" + - Arrays.deepToString(qry.getArgs()) + "]", e); - } + bindParameters(stmt, F.asList(qry.getArgs())); - try { - twoStepQry = GridSqlQuerySplitter.split( - (JdbcPreparedStatement)stmt, qry.getArgs(), qry.isCollocated(), this); + twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(), grpByCollocated, + distributedJoins); - meta = meta(stmt.getMetaData()); + List<Integer> caches; + List<Integer> extraCaches = null; + + // Setup spaces from schemas. + if (!twoStepQry.schemas().isEmpty()) { + Collection<String> spaces = new ArrayList<>(twoStepQry.schemas().size()); + caches = new ArrayList<>(twoStepQry.schemas().size() + 1); + caches.add(cctx.cacheId()); + + for (String schema : twoStepQry.schemas()) { + String space0 = space(schema); + + spaces.add(space0); + + if (!F.eq(space0, space)) { + int cacheId = CU.cacheId(space0); + + caches.add(cacheId); + + if (extraCaches == null) + extraCaches = new ArrayList<>(); + + extraCaches.add(cacheId); + } + } + + twoStepQry.spaces(spaces); } - catch (SQLException e) { - throw new CacheException(e); + else { + caches = Collections.singletonList(cctx.cacheId()); + extraCaches = null; } + + twoStepQry.caches(caches); + twoStepQry.extraCaches(extraCaches); + + meta = meta(stmt.getMetaData()); + } + catch (IgniteCheckedException e) { + throw new CacheException("Failed to bind parameters: [qry=" + sqlQry + ", params=" + + Arrays.deepToString(qry.getArgs()) + "]", e); + } + catch (SQLException e) { + throw new CacheException(e); } finally { U.close(stmt, log); @@ -1104,7 +1204,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { twoStepQry.pageSize(qry.getPageSize()); - QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(queryTwoStep(cctx, twoStepQry, cctx.keepBinary())); + QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>( + runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder)); cursor.fieldsMeta(meta); @@ -1117,17 +1218,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** - * Sets filters for current thread. Must be set to not null value - * before executeQuery and reset to null after in finally block since it signals - * to table that it should return content without expired values. - * - * @param filters Filters. - */ - public void setFilters(@Nullable IndexingQueryFilter filters) { - GridH2IndexBase.setFiltersForThread(filters); - } - - /** * Prepares statement for query. * * @param qry Query string. @@ -1331,7 +1421,18 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridH2RowDescriptor desc = new RowDescriptor(tbl.type(), schema); - GridH2Table.Engine.createTable(conn, sql.toString(), desc, tbl, tbl.schema.spaceName); + GridH2Table res = GridH2Table.Engine.createTable(conn, sql.toString(), desc, tbl, tbl.schema.spaceName); + + if (dataTables.putIfAbsent(res.identifier(), res) != null) + throw new IllegalStateException("Table already exists: " + res.identifier()); + } + + /** + * @param identifier Table identifier. + * @return Data table. + */ + public GridH2Table dataTable(String identifier) { + return dataTables.get(identifier); } /** @@ -1447,18 +1548,43 @@ public class IgniteH2Indexing implements GridQueryIndexing { tbl.tbl.rebuildIndexes(); } - /** {@inheritDoc} */ - @Override public long size(@Nullable String spaceName, GridQueryTypeDescriptor type, - IndexingQueryFilter filters) throws IgniteCheckedException { + /** + * Gets size (for tests only). + * + * @param spaceName Space name. + * @param type Type descriptor. + * @return Size. + * @throws IgniteCheckedException If failed or {@code -1} if the type is unknown. + */ + long size(@Nullable String spaceName, GridQueryTypeDescriptor type) throws IgniteCheckedException { TableDescriptor tbl = tableDescriptor(spaceName, type); if (tbl == null) return -1; - IgniteSpiCloseableIterator<List<?>> iter = queryFields(spaceName, - "SELECT COUNT(*) FROM " + tbl.fullTableName(), null, null).iterator(); + Connection conn = connectionForSpace(spaceName); + + setupConnection(conn, false, false); + + ResultSet rs = executeSqlQuery(conn, + "SELECT COUNT(*) FROM " + tbl.fullTableName(), null, false); + + try { + if (!rs.next()) + throw new IllegalStateException(); - return ((Number)iter.next().get(0)).longValue(); + return rs.getLong(1); + } + catch (SQLException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * @return Busy lock. + */ + public GridSpinBusyLock busyLock() { + return busyLock; } /** @@ -1481,8 +1607,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (log.isDebugEnabled()) log.debug("Starting cache query index..."); - System.setProperty("h2.serializeJavaObject", "false"); - System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid ValueJavaObject caching. + this.busyLock = busyLock; if (SysProperties.serializeJavaObject) { U.warn(log, "Serialization of Java objects in H2 was enabled."); @@ -1490,10 +1615,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { SysProperties.serializeJavaObject = false; } - if (Utils.serializer != null) + if (JdbcUtils.serializer != null) U.warn(log, "Custom H2 serialization is already configured, will override."); - Utils.serializer = h2Serializer(); + JdbcUtils.serializer = h2Serializer(); String dbName = (ctx != null ? ctx.localNodeId() : UUID.randomUUID()).toString(); @@ -1522,8 +1647,11 @@ public class IgniteH2Indexing implements GridQueryIndexing { throw new IgniteCheckedException(e); } - if (ctx == null) // This is allowed in some tests. + if (ctx == null) { + // This is allowed in some tests. + nodeId = UUID.randomUUID(); marshaller = new JdkMarshaller(); + } else { this.ctx = ctx; @@ -1548,9 +1676,92 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * @param topic Topic. + * @param topicOrd Topic ordinal for {@link GridTopic}. + * @param nodes Nodes. + * @param msg Message. + * @param specialize Optional closure to specialize message for each node. + * @param locNodeHnd Handler for local node. + * @param plc Policy identifying the executor service which will process message. + * @param runLocParallel Run local handler in parallel thread. + * @return {@code true} If all messages sent successfully. + */ + public boolean send( + Object topic, + int topicOrd, + Collection<ClusterNode> nodes, + Message msg, + @Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize, + @Nullable final IgniteInClosure2X<ClusterNode, Message> locNodeHnd, + byte plc, + boolean runLocParallel + ) { + boolean ok = true; + + if (specialize == null && msg instanceof GridCacheQueryMarshallable) + ((GridCacheQueryMarshallable)msg).marshall(marshaller); + + ClusterNode locNode = null; + + for (ClusterNode node : nodes) { + if (node.isLocal()) { + locNode = node; + + continue; + } + + try { + if (specialize != null) { + msg = specialize.apply(node, msg); + + if (msg instanceof GridCacheQueryMarshallable) + ((GridCacheQueryMarshallable)msg).marshall(marshaller); + } + + ctx.io().send(node, topic, topicOrd, msg, plc); + } + catch (IgniteCheckedException e) { + ok = false; + + U.warn(log, "Failed to send message [node=" + node + ", msg=" + msg + + ", errMsg=" + e.getMessage() + "]"); + } + } + + // Local node goes the last to allow parallel execution. + if (locNode != null) { + if (specialize != null) + msg = specialize.apply(locNode, msg); + + if (runLocParallel) { + final ClusterNode finalLocNode = locNode; + final Message finalMsg = msg; + + try { + // We prefer runLocal to runLocalSafe, because the latter can produce deadlock here. + ctx.closure().runLocal(new GridPlainRunnable() { + @Override public void run() { + locNodeHnd.apply(finalLocNode, finalMsg); + } + }, plc).listen(logger); + } + catch (IgniteCheckedException e) { + ok = false; + + U.error(log, "Failed to execute query locally.", e); + } + } + else + locNodeHnd.apply(locNode, msg); + } + + return ok; + } + + /** * @return Serializer. */ - protected JavaObjectSerializer h2Serializer() { + private JavaObjectSerializer h2Serializer() { return new JavaObjectSerializer() { @Override public byte[] serialize(Object obj) throws Exception { return marshaller.marshal(obj); @@ -1607,8 +1818,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { for (Schema schema : schemas.values()) { for (TableDescriptor desc : schema.tbls.values()) { - desc.tbl.close(); - if (desc.luceneIdx != null) U.closeQuiet(desc.luceneIdx); } @@ -1632,18 +1841,19 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (stmtCacheCleanupTask != null) stmtCacheCleanupTask.close(); + GridH2QueryContext.clearLocalNodeStop(nodeId); + if (log.isDebugEnabled()) log.debug("Cache query index stopped."); } /** {@inheritDoc} */ - @Override public void registerCache(CacheConfiguration<?,?> ccfg) throws IgniteCheckedException { + @Override public void registerCache(GridCacheContext<?, ?> cctx, CacheConfiguration<?,?> ccfg) + throws IgniteCheckedException { String schema = schemaNameFromCacheConf(ccfg); - if (schemas.putIfAbsent(schema, new Schema(ccfg.getName(), schema, - ccfg.getOffHeapMaxMemory() >= 0 || ccfg.getMemoryMode() == CacheMemoryMode.OFFHEAP_TIERED ? - new GridUnsafeMemory(0) : null, ccfg)) != null) - throw new IgniteCheckedException("Schema for cache already registered: " + U.maskName(ccfg.getName())); + if (schemas.putIfAbsent(schema, new Schema(ccfg.getName(), schema, cctx, ccfg)) != null) + throw new IgniteCheckedException("Cache already registered: " + U.maskName(ccfg.getName())); space2schema.put(emptyIfNull(ccfg.getName()), schema); @@ -1661,6 +1871,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { space2schema.remove(emptyIfNull(rmv.spaceName)); mapQryExec.onCacheStop(ccfg.getName()); + rmv.onDrop(); + try { dropSchema(schema); } @@ -1668,11 +1880,11 @@ public class IgniteH2Indexing implements GridQueryIndexing { U.error(log, "Failed to drop schema on cache stop (will ignore): " + U.maskName(ccfg.getName()), e); } - for (Iterator<Map.Entry<T3<String, String, Boolean>, TwoStepCachedQuery>> it = twoStepCache.entrySet().iterator(); + for (Iterator<Map.Entry<TwoStepCachedQueryKey, TwoStepCachedQuery>> it = twoStepCache.entrySet().iterator(); it.hasNext();) { - Map.Entry<T3<String, String, Boolean>, TwoStepCachedQuery> e = it.next(); + Map.Entry<TwoStepCachedQueryKey, TwoStepCachedQuery> e = it.next(); - if (F.eq(e.getKey().get1(), ccfg.getName())) + if (F.eq(e.getKey().space, ccfg.getName())) it.remove(); } } @@ -1680,7 +1892,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ @Override public IndexingQueryFilter backupFilter( - @Nullable final List<String> caches, @Nullable final AffinityTopologyVersion topVer, @Nullable final int[] parts ) { @@ -1735,6 +1946,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { @Override public boolean isValueRequired() { return false; } + + @Override public String toString() { + return "IndexingQueryFilter [ver=" + topVer + ']'; + } }; } @@ -1762,6 +1977,81 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * Key for cached two-step query. + */ + private static final class TwoStepCachedQueryKey { + /** */ + private final String space; + + /** */ + private final String sql; + + /** */ + private final boolean grpByCollocated; + + /** */ + private final boolean distributedJoins; + + /** */ + private final boolean enforceJoinOrder; + + /** + * @param space Space. + * @param sql Sql. + * @param grpByCollocated Collocated GROUP BY. + * @param distributedJoins Distributed joins enabled. + * @param enforceJoinOrder Enforce join order of tables. + */ + private TwoStepCachedQueryKey(String space, + String sql, + boolean grpByCollocated, + boolean distributedJoins, + boolean enforceJoinOrder) { + this.space = space; + this.sql = sql; + this.grpByCollocated = grpByCollocated; + this.distributedJoins = distributedJoins; + this.enforceJoinOrder = enforceJoinOrder; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TwoStepCachedQueryKey that = (TwoStepCachedQueryKey)o; + + if (grpByCollocated != that.grpByCollocated) + return false; + + if (distributedJoins != that.distributedJoins) + return false; + + if (enforceJoinOrder != that.enforceJoinOrder) + return false; + + if (space != null ? !space.equals(that.space) : that.space != null) + return false; + + return sql.equals(that.sql); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = space != null ? space.hashCode() : 0; + res = 31 * res + sql.hashCode(); + res = 31 * res + (grpByCollocated ? 1 : 0); + res = 31 * res + (distributedJoins ? 1 : 0); + res = 31 * res + (enforceJoinOrder ? 1 : 0); + + return res; + } + } + + /** * Cached two-step query. */ private static final class TwoStepCachedQuery { @@ -1787,6 +2077,47 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * @param c1 First column. + * @param c2 Second column. + * @return {@code true} If they are the same. + */ + private static boolean equal(IndexColumn c1, IndexColumn c2) { + return c1.column.getColumnId() == c2.column.getColumnId(); + } + + /** + * @param cols Columns list. + * @param col Column to find. + * @return {@code true} If found. + */ + private static boolean containsColumn(List<IndexColumn> cols, IndexColumn col) { + for (int i = cols.size() - 1; i >= 0; i--) { + if (equal(cols.get(i), col)) + return true; + } + + return false; + } + + /** + * @param cols Columns list. + * @param keyCol Primary key column. + * @param affCol Affinity key column. + * @return The same list back. + */ + private static List<IndexColumn> treeIndexColumns(List<IndexColumn> cols, IndexColumn keyCol, IndexColumn affCol) { + assert keyCol != null; + + if (!containsColumn(cols, keyCol)) + cols.add(keyCol); + + if (affCol != null && !containsColumn(cols, affCol)) + cols.add(affCol); + + return cols; + } + + /** * Wrapper to store connection and flag is schema set or not. */ private static class ConnectionWrapper { @@ -2033,7 +2364,15 @@ public class IgniteH2Indexing implements GridQueryIndexing { ArrayList<Index> idxs = new ArrayList<>(); - idxs.add(new GridH2TreeIndex("_key_PK", tbl, true, KEY_COL, VAL_COL, tbl.indexColumn(0, ASCENDING))); + IndexColumn keyCol = tbl.indexColumn(KEY_COL, SortOrder.ASCENDING); + IndexColumn affCol = tbl.getAffinityKeyColumn(); + + if (affCol != null && equal(affCol, keyCol)) + affCol = null; + + // Add primary key index. + idxs.add(new GridH2TreeIndex("_key_PK", tbl, true, + treeIndexColumns(new ArrayList<IndexColumn>(2), keyCol, affCol))); if (type().valueClass() == String.class) { try { @@ -2044,6 +2383,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { } } + boolean affIdxFound = false; + for (Map.Entry<String, GridQueryIndexDescriptor> e : type.indexes().entrySet()) { String name = e.getKey(); GridQueryIndexDescriptor idx = e.getValue(); @@ -2057,30 +2398,40 @@ public class IgniteH2Indexing implements GridQueryIndexing { } } else { - IndexColumn[] cols = new IndexColumn[idx.fields().size()]; - - int i = 0; + List<IndexColumn> cols = new ArrayList<>(idx.fields().size() + 2); boolean escapeAll = schema.escapeAll(); for (String field : idx.fields()) { - // H2 reserved keywords used as column name is case sensitive. String fieldName = escapeAll ? field : escapeName(field, false).toUpperCase(); Column col = tbl.getColumn(fieldName); - cols[i++] = tbl.indexColumn(col.getColumnId(), idx.descending(field) ? DESCENDING : ASCENDING); + cols.add(tbl.indexColumn(col.getColumnId(), + idx.descending(field) ? SortOrder.DESCENDING : SortOrder.ASCENDING)); } - if (idx.type() == SORTED) - idxs.add(new GridH2TreeIndex(name, tbl, false, KEY_COL, VAL_COL, cols)); + if (idx.type() == SORTED) { + // We don't care about number of fields in affinity index, just affinity key must be the first. + affIdxFound |= affCol != null && equal(cols.get(0), affCol); + + cols = treeIndexColumns(cols, keyCol, affCol); + + idxs.add(new GridH2TreeIndex(name, tbl, false, cols)); + } else if (idx.type() == GEO_SPATIAL) - idxs.add(createH2SpatialIndex(tbl, name, cols, KEY_COL, VAL_COL)); + idxs.add(createH2SpatialIndex(tbl, name, cols.toArray(new IndexColumn[cols.size()]))); else - throw new IllegalStateException(); + throw new IllegalStateException("Index type: " + idx.type()); } } + // Add explicit affinity key index if nothing alike was found. + if (affCol != null && !affIdxFound) { + idxs.add(new GridH2TreeIndex("AFFINITY_KEY", tbl, false, + treeIndexColumns(new ArrayList<IndexColumn>(2), affCol, keyCol))); + } + return idxs; } @@ -2088,15 +2439,11 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param tbl Table. * @param idxName Index name. * @param cols Columns. - * @param keyCol Key column. - * @param valCol Value column. */ private SpatialIndex createH2SpatialIndex( Table tbl, String idxName, - IndexColumn[] cols, - int keyCol, - int valCol + IndexColumn[] cols ) { String className = "org.apache.ignite.internal.processors.query.h2.opt.GridH2SpatialIndex"; @@ -2106,14 +2453,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { Constructor<?> ctor = cls.getConstructor( Table.class, String.class, - IndexColumn[].class, - int.class, - int.class); + IndexColumn[].class); if (!ctor.isAccessible()) ctor.setAccessible(true); - return (SpatialIndex)ctor.newInstance(tbl, idxName, cols, keyCol, valCol); + return (SpatialIndex)ctor.newInstance(tbl, idxName, cols); } catch (Exception e) { throw new IgniteException("Failed to instantiate: " + className, e); @@ -2162,6 +2507,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override protected IgniteBiTuple<K, V> createRow() { K key = (K)row[0]; V val = (V)row[1]; @@ -2256,10 +2602,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** * Database schema object. */ - private static class Schema { - /** */ - private static final long serialVersionUID = 0L; - + private class Schema { /** */ private final String spaceName; @@ -2276,23 +2619,33 @@ public class IgniteH2Indexing implements GridQueryIndexing { private final CacheLongKeyLIRS<GridH2KeyValueRowOffheap> rowCache; /** */ + private final GridCacheContext<?,?> cctx; + + /** */ private final CacheConfiguration<?,?> ccfg; /** * @param spaceName Space name. * @param schemaName Schema name. - * @param offheap Offheap memory. + * @param cctx Cache context. * @param ccfg Cache configuration. */ - private Schema(@Nullable String spaceName, String schemaName, GridUnsafeMemory offheap, CacheConfiguration<?,?> ccfg) { + private Schema(String spaceName, String schemaName, GridCacheContext<?,?> cctx, CacheConfiguration<?,?> ccfg) { this.spaceName = spaceName; + this.cctx = cctx; this.schemaName = schemaName; - this.offheap = offheap; this.ccfg = ccfg; - if (offheap != null) - rowCache = new CacheLongKeyLIRS<>(ccfg.getSqlOnheapRowCacheSize(), 1, 128, 256); - else + offheap = ccfg.getOffHeapMaxMemory() >= 0 || ccfg.getMemoryMode() == CacheMemoryMode.OFFHEAP_TIERED ? + new GridUnsafeMemory(0) : null; + + if (offheap != null) { + CacheLongKeyLIRS.Config lirsCfg = new CacheLongKeyLIRS.Config(); + + lirsCfg.maxMemory = ccfg.getSqlOnheapRowCacheSize(); + + rowCache = new CacheLongKeyLIRS<>(lirsCfg); + } else rowCache = null; } @@ -2310,6 +2663,19 @@ public class IgniteH2Indexing implements GridQueryIndexing { public boolean escapeAll() { return ccfg.isSqlEscapeAll(); } + + /** + * Called after the schema was dropped. + */ + public void onDrop() { + for (TableDescriptor tblDesc : tbls.values()) { + GridH2Table tbl = tblDesc.tbl; + + dataTables.remove(tbl.identifier(), tbl); + + tbl.destroy(); + } + } } /** @@ -2390,6 +2756,26 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ + @Override public IgniteH2Indexing indexing() { + return IgniteH2Indexing.this; + } + + /** {@inheritDoc} */ + @Override public GridQueryTypeDescriptor type() { + return type; + } + + /** {@inheritDoc} */ + @Override public GridCacheContext<?,?> context() { + return schema.cctx; + } + + /** {@inheritDoc} */ + @Override public CacheConfiguration configuration() { + return schema.ccfg; + } + + /** {@inheritDoc} */ @Override public GridUnsafeGuard guard() { return guard; } @@ -2414,11 +2800,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public IgniteH2Indexing owner() { - return IgniteH2Indexing.this; - } - - /** {@inheritDoc} */ @Override public Value wrap(Object obj, int type) throws IgniteCheckedException { assert obj != null; @@ -2457,7 +2838,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (obj instanceof java.util.Date && !(obj instanceof Timestamp)) obj = new Timestamp(((java.util.Date) obj).getTime()); - return GridH2Utils.toValueTimestamp((Timestamp)obj); + return ValueTimestamp.get((Timestamp)obj); case Value.DECIMAL: return ValueDecimal.get((BigDecimal)obj); case Value.STRING: @@ -2491,7 +2872,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { throws IgniteCheckedException { try { if (val == null) // Only can happen for remove operation, can create simple search row. - return new GridH2Row(wrap(key, keyType), null); + return GridH2RowFactory.create(wrap(key, keyType)); return schema.offheap == null ? new GridH2KeyValueRowOnheap(this, key, keyType, val, valType, expirationTime) : http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java index fe6851d..284f8c5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java @@ -55,6 +55,15 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") protected long expirationTime; + /** */ + private Value key; + + /** */ + private volatile Value val; + + /** */ + private Value[] valCache; + /** * Constructor. * @@ -68,8 +77,6 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { */ protected GridH2AbstractKeyValueRow(GridH2RowDescriptor desc, Object key, int keyType, @Nullable Object val, int valType, long expirationTime) throws IgniteCheckedException { - super(null, null); - setValue(KEY_COL, desc.wrap(key, keyType)); if (val != null) // We remove by key only, so value can be null here. @@ -79,14 +86,17 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { this.expirationTime = expirationTime; } + /** {@inheritDoc} */ + @Override public Value[] getValueList() { + throw new UnsupportedOperationException(); + } + /** * Protected constructor for {@link GridH2KeyValueRowOffheap} * * @param desc Row descriptor. */ protected GridH2AbstractKeyValueRow(GridH2RowDescriptor desc) { - super(new Value[DEFAULT_COLUMNS_COUNT]); - this.desc = desc; } @@ -184,11 +194,20 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { * @return Value if exists. */ protected final Value peekValue(int col) { - return getValueList()[col]; + return col == KEY_COL ? key : val; } /** {@inheritDoc} */ @Override public Value getValue(int col) { + Value[] vCache = valCache; + + if (vCache != null) { + Value v = vCache[col]; + + if (v != null) + return v; + } + if (col < DEFAULT_COLUMNS_COUNT) { Value v; @@ -288,15 +307,35 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { Object res = desc.columnValue(key.getObject(), val.getObject(), col); - if (res == null) - return ValueNull.INSTANCE; + Value v; - try { - return desc.wrap(res, desc.fieldType(col)); + if (res == null) + v = ValueNull.INSTANCE; + else { + try { + v = desc.wrap(res, desc.fieldType(col)); + } + catch (IgniteCheckedException e) { + throw DbException.convert(e); + } } - catch (IgniteCheckedException e) { - throw DbException.convert(e); + + if (vCache != null) + vCache[col + DEFAULT_COLUMNS_COUNT] = v; + + return v; + } + + /** + * @param valCache Value cache. + */ + public void valuesCache(Value[] valCache) { + if (valCache != null) { + valCache[KEY_COL] = key; + valCache[VAL_COL] = val; } + + this.valCache = valCache; } /** @@ -467,6 +506,17 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { } /** {@inheritDoc} */ + @Override public void setValue(int idx, Value v) { + if (idx == VAL_COL) + val = v; + else { + assert idx == KEY_COL : idx + " " + v; + + key = v; + } + } + + /** {@inheritDoc} */ @Override public final int hashCode() { throw new IllegalStateException(); }
