ignite-1232 Distributed SQL joins implementation
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/68891e89 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/68891e89 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/68891e89 Branch: refs/heads/ignite-3553 Commit: 68891e89dd0e0f19321d6a4d45ae7372279b8b08 Parents: eddd0a9 Author: sboikov <[email protected]> Authored: Fri Jul 22 17:07:58 2016 +0300 Committer: sboikov <[email protected]> Committed: Fri Jul 22 17:08:03 2016 +0300 ---------------------------------------------------------------------- .../examples/datagrid/CacheQueryExample.java | 78 +- .../ignite/codegen/MessageCodeGenerator.java | 6 + .../ignite/cache/query/SqlFieldsQuery.java | 53 + .../org/apache/ignite/cache/query/SqlQuery.java | 25 + .../managers/communication/GridIoManager.java | 32 +- .../communication/GridIoMessageFactory.java | 2 +- .../managers/communication/GridIoPolicy.java | 5 +- .../cache/GridCacheAffinityManager.java | 2 +- .../processors/cache/GridCacheContext.java | 19 + .../GridCacheDefaultAffinityKeyMapper.java | 19 + .../processors/cache/IgniteCacheProxy.java | 8 + .../binary/CacheObjectBinaryProcessorImpl.java | 8 + .../dht/GridDhtPartitionsReservation.java | 3 +- .../cache/query/GridCacheQueryManager.java | 47 +- .../cache/query/GridCacheQueryMarshallable.java | 37 + .../cache/query/GridCacheSqlQuery.java | 33 +- .../cache/query/GridCacheTwoStepQuery.java | 123 +- .../cacheobject/IgniteCacheObjectProcessor.java | 6 + .../IgniteCacheObjectProcessorImpl.java | 5 + .../processors/closure/GridClosurePolicy.java | 51 - .../closure/GridClosureProcessor.java | 52 +- .../processors/query/GridQueryIndexing.java | 51 +- .../processors/query/GridQueryProcessor.java | 192 +- .../query/GridQueryTypeDescriptor.java | 7 + .../messages/GridQueryCancelRequest.java | 2 +- .../twostep/messages/GridQueryFailResponse.java | 2 +- .../messages/GridQueryNextPageRequest.java | 2 +- .../messages/GridQueryNextPageResponse.java | 12 +- .../h2/twostep/messages/GridQueryRequest.java | 28 +- .../ignite/internal/util/IgniteUtils.java | 13 + .../ignite/internal/util/lang/GridFunc.java | 20 + .../offheap/unsafe/GridOffHeapSnapTreeMap.java | 91 +- .../jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java | 2 +- .../junits/common/GridCommonAbstractTest.java | 39 +- .../query/h2/opt/GridH2SpatialIndex.java | 74 +- .../processors/query/h2/IgniteH2Indexing.java | 709 ++++++-- .../query/h2/opt/GridH2AbstractKeyValueRow.java | 72 +- .../query/h2/opt/GridH2CollocationModel.java | 783 +++++++++ .../processors/query/h2/opt/GridH2Cursor.java | 36 +- .../query/h2/opt/GridH2DefaultTableEngine.java | 38 + .../query/h2/opt/GridH2IndexBase.java | 1392 ++++++++++++++- .../query/h2/opt/GridH2MetaTable.java | 383 ++++ .../query/h2/opt/GridH2QueryContext.java | 612 +++++++ .../query/h2/opt/GridH2QueryType.java | 49 + .../query/h2/opt/GridH2RetryException.java | 32 + .../processors/query/h2/opt/GridH2Row.java | 86 +- .../query/h2/opt/GridH2RowDescriptor.java | 28 +- .../query/h2/opt/GridH2RowFactory.java | 179 ++ .../processors/query/h2/opt/GridH2Table.java | 372 ++-- .../query/h2/opt/GridH2TreeIndex.java | 142 +- .../processors/query/h2/opt/GridH2Utils.java | 133 -- .../query/h2/opt/GridH2ValueCacheObject.java | 3 +- .../query/h2/opt/GridLuceneIndex.java | 7 +- .../processors/query/h2/sql/GridSqlAlias.java | 12 + .../processors/query/h2/sql/GridSqlColumn.java | 22 +- .../processors/query/h2/sql/GridSqlElement.java | 11 + .../query/h2/sql/GridSqlOperation.java | 2 +- .../query/h2/sql/GridSqlOperationType.java | 8 +- .../query/h2/sql/GridSqlQueryParser.java | 97 +- .../query/h2/sql/GridSqlQuerySplitter.java | 293 +++- .../processors/query/h2/sql/GridSqlSelect.java | 9 +- .../processors/query/h2/sql/GridSqlTable.java | 70 + .../query/h2/twostep/GridMapQueryExecutor.java | 415 +++-- .../query/h2/twostep/GridMergeIndex.java | 75 +- .../h2/twostep/GridMergeIndexUnsorted.java | 6 +- .../query/h2/twostep/GridMergeTable.java | 4 +- .../h2/twostep/GridReduceQueryExecutor.java | 338 ++-- .../query/h2/twostep/GridThreadLocalTable.java | 68 +- .../query/h2/twostep/msg/GridH2Array.java | 9 +- .../query/h2/twostep/msg/GridH2Boolean.java | 10 +- .../query/h2/twostep/msg/GridH2Byte.java | 9 +- .../query/h2/twostep/msg/GridH2Bytes.java | 11 +- .../query/h2/twostep/msg/GridH2CacheObject.java | 9 +- .../query/h2/twostep/msg/GridH2Date.java | 9 +- .../query/h2/twostep/msg/GridH2Decimal.java | 11 +- .../query/h2/twostep/msg/GridH2Double.java | 9 +- .../query/h2/twostep/msg/GridH2Float.java | 9 +- .../query/h2/twostep/msg/GridH2Geometry.java | 11 +- .../h2/twostep/msg/GridH2IndexRangeRequest.java | 208 +++ .../twostep/msg/GridH2IndexRangeResponse.java | 279 +++ .../query/h2/twostep/msg/GridH2Integer.java | 20 +- .../query/h2/twostep/msg/GridH2JavaObject.java | 11 +- .../query/h2/twostep/msg/GridH2Long.java | 9 +- .../query/h2/twostep/msg/GridH2Null.java | 15 +- .../h2/twostep/msg/GridH2QueryRequest.java | 401 +++++ .../query/h2/twostep/msg/GridH2RowMessage.java | 116 ++ .../query/h2/twostep/msg/GridH2RowRange.java | 181 ++ .../h2/twostep/msg/GridH2RowRangeBounds.java | 188 ++ .../query/h2/twostep/msg/GridH2Short.java | 9 +- .../query/h2/twostep/msg/GridH2String.java | 9 +- .../query/h2/twostep/msg/GridH2Time.java | 9 +- .../query/h2/twostep/msg/GridH2Timestamp.java | 11 +- .../query/h2/twostep/msg/GridH2Uuid.java | 9 +- .../h2/twostep/msg/GridH2ValueMessage.java | 2 +- .../twostep/msg/GridH2ValueMessageFactory.java | 22 +- ...idCacheReduceQueryMultithreadedSelfTest.java | 168 -- .../cache/IgniteCacheAbstractQuerySelfTest.java | 30 +- .../IgniteCacheCrossCacheJoinRandomTest.java | 442 +++++ ...acheDistributedJoinCollocatedAndNotTest.java | 365 ++++ ...acheDistributedJoinCustomAffinityMapper.java | 262 +++ .../IgniteCacheDistributedJoinNoIndexTest.java | 299 ++++ ...ributedJoinPartitionedAndReplicatedTest.java | 487 ++++++ ...CacheDistributedJoinQueryConditionsTest.java | 624 +++++++ .../cache/IgniteCacheDistributedJoinTest.java | 316 ++++ ...PartitionedAndReplicatedCollocationTest.java | 399 +++++ ...teCacheJoinPartitionedAndReplicatedTest.java | 78 +- ...IgniteCacheJoinQueryWithAffinityKeyTest.java | 646 +++++++ .../cache/IgniteCacheQueryLoadSelfTest.java | 12 +- .../cache/IgniteCrossCachesJoinsQueryTest.java | 1641 ++++++++++++++++++ ...QueryNodeRestartDistributedJoinSelfTest.java | 476 +++++ ...dCacheAbstractReduceFieldsQuerySelfTest.java | 420 ----- ...ridCacheReduceFieldsQueryAtomicSelfTest.java | 38 - ...GridCacheReduceFieldsQueryLocalSelfTest.java | 37 - ...cheReduceFieldsQueryPartitionedSelfTest.java | 59 - ...acheReduceFieldsQueryReplicatedSelfTest.java | 37 - .../query/IgniteSqlSchemaIndexingTest.java | 5 +- .../query/IgniteSqlSplitterSelfTest.java | 833 ++++++++- .../h2/GridIndexingSpiAbstractSelfTest.java | 145 +- .../query/h2/opt/GridH2TableSelfTest.java | 10 +- .../h2/sql/AbstractH2CompareQueryTest.java | 163 +- .../query/h2/sql/GridQueryParsingTest.java | 17 +- .../H2CompareBigQueryDistributedJoinsTest.java | 28 + .../query/h2/sql/H2CompareBigQueryTest.java | 119 +- .../IgniteCacheQuerySelfTestSuite.java | 25 +- .../IgniteCacheQuerySelfTestSuite2.java | 10 - .../ignite/yardstick/IgniteBenchmarkUtils.java | 45 + .../IgniteSqlQueryDistributedJoinBenchmark.java | 184 ++ ...lQueryDistributedJoinBroadcastBenchmark.java | 28 + parent/pom.xml | 24 +- 129 files changed, 15201 insertions(+), 2502 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheQueryExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheQueryExample.java index 9200489..85d74e0 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheQueryExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheQueryExample.java @@ -46,8 +46,11 @@ import org.apache.ignite.lang.IgniteBiPredicate; * limitations (not applied if data is queried from one node only): * <ul> * <li> - * Joins will work correctly only if joined objects are stored in + * Non-distributed joins will work correctly only if joined objects are stored in * collocated mode. Refer to {@link AffinityKey} javadoc for more details. + * <p> + * To use distributed joins it is necessary to set query 'distributedJoin' flag using + * {@link SqlFieldsQuery#setDistributedJoins(boolean)} or {@link SqlQuery#setDistributedJoins(boolean)}. * </li> * <li> * Note that if you created query on to replicated cache, all data will @@ -65,6 +68,9 @@ public class CacheQueryExample { /** Organizations cache name. */ private static final String ORG_CACHE = CacheQueryExample.class.getSimpleName() + "Organizations"; + /** Persons collocated with Organizations cache name. */ + private static final String COLLOCATED_PERSON_CACHE = CacheQueryExample.class.getSimpleName() + "CollocatedPersons"; + /** Persons cache name. */ private static final String PERSON_CACHE = CacheQueryExample.class.getSimpleName() + "Persons"; @@ -84,14 +90,20 @@ public class CacheQueryExample { orgCacheCfg.setCacheMode(CacheMode.PARTITIONED); // Default. orgCacheCfg.setIndexedTypes(Long.class, Organization.class); + CacheConfiguration<AffinityKey<Long>, Person> colPersonCacheCfg = new CacheConfiguration<>(COLLOCATED_PERSON_CACHE); + + colPersonCacheCfg.setCacheMode(CacheMode.PARTITIONED); // Default. + colPersonCacheCfg.setIndexedTypes(AffinityKey.class, Person.class); + CacheConfiguration<AffinityKey<Long>, Person> personCacheCfg = new CacheConfiguration<>(PERSON_CACHE); personCacheCfg.setCacheMode(CacheMode.PARTITIONED); // Default. - personCacheCfg.setIndexedTypes(AffinityKey.class, Person.class); + personCacheCfg.setIndexedTypes(Long.class, Person.class); // Auto-close cache at the end of the example. try ( IgniteCache<Long, Organization> orgCache = ignite.getOrCreateCache(orgCacheCfg); + IgniteCache<AffinityKey<Long>, Person> colPersonCache = ignite.getOrCreateCache(colPersonCacheCfg); IgniteCache<AffinityKey<Long>, Person> personCache = ignite.getOrCreateCache(personCacheCfg) ) { // Populate cache. @@ -103,9 +115,12 @@ public class CacheQueryExample { // Example for SQL-based querying employees based on salary ranges. sqlQuery(); - // Example for SQL-based querying employees for a given organization (includes SQL join). + // Example for SQL-based querying employees for a given organization (includes SQL join for collocated objects). sqlQueryWithJoin(); + // Example for SQL-based querying employees for a given organization (includes distributed SQL join). + sqlQueryWithDistributedJoin(); + // Example for TEXT-based querying for a given string in peoples resumes. textQuery(); @@ -121,6 +136,7 @@ public class CacheQueryExample { } finally { // Distributed cache could be removed from cluster only by #destroyCache() call. + ignite.destroyCache(COLLOCATED_PERSON_CACHE); ignite.destroyCache(PERSON_CACHE); ignite.destroyCache(ORG_CACHE); } @@ -133,7 +149,7 @@ public class CacheQueryExample { * Example for scan query based on a predicate. */ private static void scanQuery() { - IgniteCache<BinaryObject, BinaryObject> cache = Ignition.ignite().cache(PERSON_CACHE).withKeepBinary(); + IgniteCache<BinaryObject, BinaryObject> cache = Ignition.ignite().cache(COLLOCATED_PERSON_CACHE).withKeepBinary(); ScanQuery<BinaryObject, BinaryObject> scan = new ScanQuery<>( new IgniteBiPredicate<BinaryObject, BinaryObject>() { @@ -170,7 +186,7 @@ public class CacheQueryExample { * Example for SQL queries based on all employees working for a specific organization. */ private static void sqlQueryWithJoin() { - IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(PERSON_CACHE); + IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(COLLOCATED_PERSON_CACHE); // SQL clause query which joins on 2 types to select people for a specific organization. String joinSql = @@ -189,6 +205,32 @@ public class CacheQueryExample { } /** + * Example for SQL queries based on all employees working for a specific organization (query uses distributed join). + */ + private static void sqlQueryWithDistributedJoin() { + IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(PERSON_CACHE); + + // SQL clause query which joins on 2 types to select people for a specific organization. + String joinSql = + "from Person, \"" + ORG_CACHE + "\".Organization as org " + + "where Person.orgId = org.id " + + "and lower(org.name) = lower(?)"; + + SqlQuery qry = new SqlQuery<AffinityKey<Long>, Person>(Person.class, joinSql). + setArgs("ApacheIgnite"); + + // Enable distributed joins for query. + qry.setDistributedJoins(true); + + // Execute queries for find employees for different organizations. + print("Following people are 'ApacheIgnite' employees (distributed join): ", cache.query(qry).getAll()); + + qry.setArgs("Other"); + + print("Following people are 'Other' employees (distributed join): ", cache.query(qry).getAll()); + } + + /** * Example for TEXT queries using LUCENE-based indexing of people's resumes. */ private static void textQuery() { @@ -210,7 +252,7 @@ public class CacheQueryExample { * Example for SQL queries to calculate average salary for a specific organization. */ private static void sqlQueryWithAggregation() { - IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(PERSON_CACHE); + IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(COLLOCATED_PERSON_CACHE); // Calculate average of salary of all persons in ApacheIgnite. // Note that we also join on Organization cache as well. @@ -249,7 +291,7 @@ public class CacheQueryExample { * fields instead of whole key-value pairs. */ private static void sqlFieldsQueryWithJoin() { - IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(PERSON_CACHE); + IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(COLLOCATED_PERSON_CACHE); // Execute query to get names of all employees. String sql = @@ -263,7 +305,7 @@ public class CacheQueryExample { List<List<?>> res = cursor.getAll(); // Print persons' names and organizations' names. - print("Names of all employees and organizations they belong to:", res); + print("Names of all employees and organizations they belong to: ", res); } /** @@ -282,9 +324,11 @@ public class CacheQueryExample { orgCache.put(org1.id(), org1); orgCache.put(org2.id(), org2); - IgniteCache<AffinityKey<Long>, Person> personCache = Ignition.ignite().cache(PERSON_CACHE); + IgniteCache<AffinityKey<Long>, Person> colPersonCache = Ignition.ignite().cache(COLLOCATED_PERSON_CACHE); + IgniteCache<Long, Person> personCache = Ignition.ignite().cache(PERSON_CACHE); - // Clear cache before running the example. + // Clear caches before running the example. + colPersonCache.clear(); personCache.clear(); // People. @@ -295,10 +339,16 @@ public class CacheQueryExample { // Note that in this example we use custom affinity key for Person objects // to ensure that all persons are collocated with their organizations. - personCache.put(p1.key(), p1); - personCache.put(p2.key(), p2); - personCache.put(p3.key(), p3); - personCache.put(p4.key(), p4); + colPersonCache.put(p1.key(), p1); + colPersonCache.put(p2.key(), p2); + colPersonCache.put(p3.key(), p3); + colPersonCache.put(p4.key(), p4); + + // These Person objects are not collocated with their organizations. + personCache.put(p1.id, p1); + personCache.put(p2.id, p2); + personCache.put(p3.id, p3); + personCache.put(p4.id, p4); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index a6ae0da..f0ba5ce 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@ -223,6 +223,12 @@ public class MessageCodeGenerator { // gen.generateAndWrite(GridH2Uuid.class); // gen.generateAndWrite(GridH2Geometry.class); // gen.generateAndWrite(GridH2CacheObject.class); +// gen.generateAndWrite(GridH2IndexRangeRequest.class); +// gen.generateAndWrite(GridH2IndexRangeResponse.class); +// gen.generateAndWrite(GridH2RowRange.class); +// gen.generateAndWrite(GridH2RowRangeBounds.class); +// gen.generateAndWrite(GridH2QueryRequest.class); +// gen.generateAndWrite(GridH2RowMessage.class); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java index b2dd181..48dab6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java @@ -56,6 +56,12 @@ public final class SqlFieldsQuery extends Query<List<?>> { /** Collocation flag. */ private boolean collocated; + /** */ + private boolean enforceJoinOrder; + + /** */ + private boolean distributedJoins; + /** * Constructs SQL fields query. * @@ -141,6 +147,53 @@ public final class SqlFieldsQuery extends Query<List<?>> { return this; } + /** + * Checks if join order of tables if enforced. + * + * @return Flag value. + */ + public boolean isEnforceJoinOrder() { + return enforceJoinOrder; + } + + /** + * Sets flag to enforce join order of tables in the query. If set to {@code true} + * query optimizer will not reorder tables in join. By default is {@code false}. + * <p> + * It is not recommended to enable this property until you are sure that + * your indexes and the query itself are correct and tuned as much as possible but + * query optimizer still produces wrong join order. + * + * @param enforceJoinOrder Flag value. + * @return {@code this} For chaining. + */ + public SqlFieldsQuery setEnforceJoinOrder(boolean enforceJoinOrder) { + this.enforceJoinOrder = enforceJoinOrder; + + return this; + } + + /** + * Specify if distributed joins are enabled for this query. + * + * @param distributedJoins Distributed joins enabled. + * @return {@code this} For chaining. + */ + public SqlFieldsQuery setDistributedJoins(boolean distributedJoins) { + this.distributedJoins = distributedJoins; + + return this; + } + + /** + * Check if distributed joins are enabled for this query. + * + * @return {@code true} If distributed joind enabled. + */ + public boolean isDistributedJoins() { + return distributedJoins; + } + /** {@inheritDoc} */ @Override public SqlFieldsQuery setPageSize(int pageSize) { return (SqlFieldsQuery)super.setPageSize(pageSize); http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java index be3b390..e05ff13 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java @@ -43,6 +43,9 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> { @GridToStringInclude private Object[] args; + /** */ + private boolean distributedJoins; + /** * Constructs query for the given type name and SQL query. * @@ -142,11 +145,33 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> { /** * @param type Type. + * @return {@code this} For chaining. */ public SqlQuery setType(Class<?> type) { return setType(GridQueryProcessor.typeName(type)); } + /** + * Specify if distributed joins are enabled for this query. + * + * @param distributedJoins Distributed joins enabled. + * @return {@code this} For chaining. + */ + public SqlQuery setDistributedJoins(boolean distributedJoins) { + this.distributedJoins = distributedJoins; + + return this; + } + + /** + * Check if distributed joins are enabled for this query. + * + * @return {@code true} If distributed joind enabled. + */ + public boolean isDistributedJoins() { + return distributedJoins; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(SqlQuery.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 4bc2eea..99cb7f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -88,6 +88,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MARSH_CACHE_POOL; @@ -157,6 +158,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** IGFS pool. */ private ExecutorService igfsPool; + /** Index pool. */ + private ExecutorService idxPool; + /** Discovery listener. */ private GridLocalEventListener discoLsnr; @@ -268,6 +272,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa 0, new LinkedBlockingQueue<Runnable>()); + if (IgniteComponentType.INDEXING.inClassPath()) { + int cpus = Runtime.getRuntime().availableProcessors(); + + idxPool = new IgniteThreadPoolExecutor("idx", ctx.gridName(), + cpus, cpus * 2, 3000L, new LinkedBlockingQueue<Runnable>(1000)); + } + getSpi().setListener(commLsnr = new CommunicationListener<Serializable>() { @Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) { try { @@ -651,6 +662,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa case AFFINITY_POOL: case UTILITY_CACHE_POOL: case MARSH_CACHE_POOL: + case IDX_POOL: case IGFS_POOL: { if (msg.isOrdered()) @@ -689,7 +701,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @return Execution pool. * @throws IgniteCheckedException If failed. */ - private Executor pool(byte plc) throws IgniteCheckedException { + public Executor pool(byte plc) throws IgniteCheckedException { switch (plc) { case P2P_POOL: return p2pPool; @@ -717,6 +729,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return igfsPool; + case IDX_POOL: + assert idxPool != null : "Indexing pool is not configured."; + + return idxPool; + default: { assert plc >= 0 : "Negative policy: " + plc; @@ -1357,6 +1374,19 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** * @param node Destination node. * @param topic Topic to send the message to. + * @param topicOrd GridTopic enumeration ordinal. + * @param msg Message to send. + * @param plc Type of processing. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc) + throws IgniteCheckedException { + send(node, topic, topicOrd, msg, plc, false, 0, false, null); + } + + /** + * @param node Destination node. + * @param topic Topic to send the message to. * @param msg Message to send. * @param plc Type of processing. * @param timeout Timeout to keep a message on receiving queue. http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 5f60215..1eebfd4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -752,7 +752,7 @@ public class GridIoMessageFactory implements MessageFactory { // [-3..119] [124] - this // [120..123] - DR - // [-4..-22] - SQL + // [-4..-22, -30..-35] - SQL default: if (ext != null) { for (MessageFactory factory : ext) { http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java index 00590ba..70a7354 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java @@ -43,9 +43,12 @@ public class GridIoPolicy { /** Marshaller cache execution pool. */ public static final byte MARSH_CACHE_POOL = 6; - /** Marshaller cache execution pool. */ + /** IGFS pool. */ public static final byte IGFS_POOL = 7; + /** Pool for handling distributed index range requests. */ + public static final byte IDX_POOL = 8; + /** * Defines the range of reserved pools that are not available for plugins. * @param key The key. http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index 8633333..e0e8031 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -224,7 +224,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * @param key Key. * @return Affinity key. */ - private Object affinityKey(Object key) { + public Object affinityKey(Object key) { if (key instanceof CacheObject && !(key instanceof BinaryObject)) key = ((CacheObject)key).value(cctx.cacheObjectContext(), false); http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 36d9104..fec43d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -245,6 +245,9 @@ public class GridCacheContext<K, V> implements Externalizable { /** */ private boolean deferredDel; + /** */ + private boolean customAffMapper; + /** * Empty constructor required for {@link Externalizable}. */ @@ -365,6 +368,13 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return {@code True} if custom {@link AffinityKeyMapper} is configured for cache. + */ + public boolean customAffinityMapper() { + return customAffMapper; + } + + /** * @param dynamicDeploymentId Dynamic deployment ID. */ void dynamicDeploymentId(IgniteUuid dynamicDeploymentId) { @@ -567,6 +577,13 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return {@code True} if cache is partitioned cache. + */ + public boolean isPartitioned() { + return cacheCfg.getCacheMode() == CacheMode.PARTITIONED; + } + + /** * @return {@code True} in case replication is enabled. */ public boolean isDrEnabled() { @@ -1132,6 +1149,8 @@ public class GridCacheContext<K, V> implements Externalizable { */ public void cacheObjectContext(CacheObjectContext cacheObjCtx) { this.cacheObjCtx = cacheObjCtx; + + customAffMapper = cacheCfg.getAffinityMapper().getClass() != cacheObjCtx.defaultAffMapper().getClass(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java index 5422bbd..4a2f039 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; +import org.jetbrains.annotations.Nullable; /** * Default key affinity mapper. If key class has annotation {@link AffinityKeyMapped}, @@ -120,6 +121,24 @@ public class GridCacheDefaultAffinityKeyMapper implements AffinityKeyMapper { } /** + * @param cls Key class. + * @return Name of + */ + @Nullable public String affinityKeyPropertyName(Class<?> cls) { + Field field = reflectCache.firstField(cls); + + if (field != null) + return field.getName(); + + Method mtd = reflectCache.firstMethod(cls); + + if (mtd != null) + return mtd.getName(); + + return null; + } + + /** * @param ignite Ignite. */ @IgniteInstanceResource http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 0d7bc6a..0005530 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -670,6 +670,10 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V opCtxCall != null && opCtxCall.isKeepBinary()); if (qry instanceof SqlQuery) { + if (isReplicatedDataNode() && ((SqlQuery)qry).isDistributedJoins()) + throw new CacheException("Queries using distributed JOINs have to be run on partitioned cache, " + + "not on replicated."); + final SqlQuery p = (SqlQuery)qry; if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal()) @@ -684,6 +688,10 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } if (qry instanceof SqlFieldsQuery) { + if (isReplicatedDataNode() && ((SqlFieldsQuery)qry).isDistributedJoins()) + throw new CacheException("Queries using distributed JOINs have to be run on partitioned cache, " + + "not on replicated."); + SqlFieldsQuery p = (SqlFieldsQuery)qry; if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal()) http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 8400594..f4d1d6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -516,6 +516,14 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm } /** {@inheritDoc} */ + @Override public String affinityField(String keyType) { + if (binaryCtx == null) + return null; + + return binaryCtx.affinityKeyFieldName(typeId(keyType)); + } + + /** {@inheritDoc} */ @Override public BinaryObjectBuilder builder(String clsName) { return new BinaryObjectBuilderImpl(binaryCtx, clsName); } http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java index d12247e..2f51c5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java @@ -204,7 +204,8 @@ public class GridDhtPartitionsReservation implements GridReservable { if (reservations.compareAndSet(r, r - 1)) { // If it was the last reservation and topology version changed -> attempt to evict partitions. - if (r == 1 && !topVer.equals(cctx.topology().topologyVersion())) + if (r == 1 && !cctx.kernalContext().isStopping() && + !topVer.equals(cctx.topology().topologyVersion())) tryEvict(parts.get()); return; http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 4367ee9..60e81e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -75,7 +75,6 @@ import org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicat import org.apache.ignite.internal.processors.datastructures.SetItemKey; import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; -import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor; import org.apache.ignite.internal.processors.query.GridQueryIndexType; import org.apache.ignite.internal.processors.query.GridQueryProcessor; @@ -270,25 +269,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** - * Gets number of objects of given type in index. - * - * @param valType Value type. - * @return Number of objects or -1 if type was not indexed at all. - * @throws IgniteCheckedException If failed. - */ - public long size(Class<?> valType) throws IgniteCheckedException { - if (!enterBusy()) - throw new IllegalStateException("Failed to get size (grid is stopping)."); - - try { - return qryProc.size(space, valType); - } - finally { - leaveBusy(); - } - } - - /** * Rebuilds all search indexes of given value type. * * @param typeName Value type name. @@ -586,26 +566,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte try { switch (qry.type()) { case SQL: - if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { - cctx.gridEvents().record(new CacheQueryExecutedEvent<>( - cctx.localNode(), - "SQL query executed.", - EVT_CACHE_QUERY_EXECUTED, - CacheQueryType.SQL.name(), - cctx.namex(), - qry.queryClassName(), - qry.clause(), - null, - null, - args, - subjId, - taskName)); - } - - iter = qryProc.query(space, qry.clause(), F.asList(args), - qry.queryClassName(), filter(qry)); - - break; + throw new IllegalStateException("Should never be called."); case SCAN: if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { @@ -764,11 +725,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte else { assert qry.type() == SQL_FIELDS; - GridQueryFieldsResult qryRes = qryProc.queryFields(space, qry.clause(), F.asList(args), filter(qry)); - - res.metaData(qryRes.metaData()); - - res.onDone(qryRes.iterator()); + throw new IllegalStateException("Should never be called."); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMarshallable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMarshallable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMarshallable.java new file mode 100644 index 0000000..d87936a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMarshallable.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.marshaller.Marshaller; + +/** + * Message which needs to be marshalled and unmarshalled before sending or processing it. + */ +public interface GridCacheQueryMarshallable { + /** + * @param m Marshaller. + */ + public void marshall(Marshaller m); + + /** + * @param m Marshaller. + * @param ctx Context. + */ + public void unmarshall(Marshaller m, GridKernalContext ctx); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java index 0733827..6b81ed1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.query; import java.nio.ByteBuffer; import java.util.LinkedHashMap; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -35,7 +36,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * Query. */ -public class GridCacheSqlQuery implements Message { +public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable { /** */ private static final long serialVersionUID = 0L; @@ -130,30 +131,34 @@ public class GridCacheSqlQuery implements Message { return params; } - /** - * @param m Marshaller. - * @throws IgniteCheckedException If failed. - */ - public void marshallParams(Marshaller m) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void marshall(Marshaller m) { if (paramsBytes != null) return; assert params != null; - paramsBytes = m.marshal(params); + try { + paramsBytes = m.marshal(params); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } } - /** - * @param m Marshaller. - * @throws IgniteCheckedException If failed. - */ - public void unmarshallParams(Marshaller m, GridKernalContext ctx) throws IgniteCheckedException { + /** {@inheritDoc} */ + @Override public void unmarshall(Marshaller m, GridKernalContext ctx) { if (params != null) return; assert paramsBytes != null; - params = m.unmarshal(paramsBytes, U.resolveClassLoader(ctx.config())); + try { + params = m.unmarshal(paramsBytes, U.resolveClassLoader(ctx.config())); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } } /** {@inheritDoc} */ @@ -271,4 +276,4 @@ public class GridCacheSqlQuery implements Message { return cp; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java index da59c18..8dcba2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.query; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Set; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -45,24 +46,54 @@ public class GridCacheTwoStepQuery { private boolean explain; /** */ - private Set<String> spaces; + private Collection<String> spaces; /** */ - private final boolean skipMergeTbl; + private Set<String> schemas; + + /** */ + private Set<String> tbls; + + /** */ + private boolean distributedJoins; + + /** */ + private boolean skipMergeTbl; + + /** */ + private List<Integer> caches; + + /** */ + private List<Integer> extraCaches; /** - * @param spaces All spaces accessed in query. - * @param rdc Reduce query. - * @param skipMergeTbl {@code True} if reduce query can skip merge table creation and - * get data directly from merge index. + * @param schemas Schema names in query. + * @param tbls Tables in query. */ - public GridCacheTwoStepQuery(Set<String> spaces, GridCacheSqlQuery rdc, boolean skipMergeTbl) { - assert rdc != null; + public GridCacheTwoStepQuery(Set<String> schemas, Set<String> tbls) { + this.schemas = schemas; + this.tbls = tbls; + } - this.spaces = spaces; - this.rdc = rdc; - this.skipMergeTbl = skipMergeTbl; + /** + * Specify if distributed joins are enabled for this query. + * + * @param distributedJoins Distributed joins enabled. + */ + public void distributedJoins(boolean distributedJoins) { + this.distributedJoins = distributedJoins; + } + + /** + * Check if distributed joins are enabled for this query. + * + * @return {@code true} If distributed joind enabled. + */ + public boolean distributedJoins() { + return distributedJoins; } + + /** * @return {@code True} if reduce query can skip merge table creation and get data directly from merge index. */ @@ -71,6 +102,13 @@ public class GridCacheTwoStepQuery { } /** + * @param skipMergeTbl Skip merge table. + */ + public void skipMergeTable(boolean skipMergeTbl) { + this.skipMergeTbl = skipMergeTbl; + } + + /** * @return If this is explain query. */ public boolean explain() { @@ -116,6 +154,13 @@ public class GridCacheTwoStepQuery { } /** + * @param rdc Reduce query. + */ + public void reduceQuery(GridCacheSqlQuery rdc) { + this.rdc = rdc; + } + + /** * @return Map queries. */ public List<GridCacheSqlQuery> mapQueries() { @@ -123,34 +168,84 @@ public class GridCacheTwoStepQuery { } /** + * @return Caches. + */ + public List<Integer> caches() { + return caches; + } + + /** + * @param caches Caches. + */ + public void caches(List<Integer> caches) { + this.caches = caches; + } + + /** + * @return Caches. + */ + public List<Integer> extraCaches() { + return extraCaches; + } + + /** + * @param extraCaches Caches. + */ + public void extraCaches(List<Integer> extraCaches) { + this.extraCaches = extraCaches; + } + + /** * @return Spaces. */ - public Set<String> spaces() { + public Collection<String> spaces() { return spaces; } /** * @param spaces Spaces. */ - public void spaces(Set<String> spaces) { + public void spaces(Collection<String> spaces) { this.spaces = spaces; } /** + * @return Schemas. + */ + public Set<String> schemas() { + return schemas; + } + + /** * @param args New arguments to copy with. * @return Copy. */ public GridCacheTwoStepQuery copy(Object[] args) { assert !explain; - GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(spaces, rdc.copy(args), skipMergeTbl); + GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(schemas, tbls); + + cp.caches = caches; + cp.extraCaches = extraCaches; + cp.spaces = spaces; + cp.rdc = rdc.copy(args); + cp.skipMergeTbl = skipMergeTbl; cp.pageSize = pageSize; + cp.distributedJoins = distributedJoins; + for (int i = 0; i < mapQrys.size(); i++) cp.mapQrys.add(mapQrys.get(i).copy(args)); return cp; } + /** + * @return Tables. + */ + public Set<String> tables() { + return tbls; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheTwoStepQuery.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java index b8ac301..c488b3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java @@ -187,4 +187,10 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { * @return Ignite binary interface. */ public IgniteBinary binary(); + + /** + * @param keyType Key type name. + * @return Affinity filed name or {@code null}. + */ + public String affinityField(String keyType); } http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index 3203548..f62ce36 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@ -83,6 +83,11 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme } /** {@inheritDoc} */ + @Override public String affinityField(String keyType) { + return null; + } + + /** {@inheritDoc} */ @Override public IgniteBinary binary() { return noOpBinary; } http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java deleted file mode 100644 index c17cedd..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.closure; - -import org.jetbrains.annotations.Nullable; - -/** - * This enumeration defines different types of closure - * processing by the closure processor. - */ -public enum GridClosurePolicy { - /** Public execution pool. */ - PUBLIC_POOL, - - /** P2P execution pool. */ - P2P_POOL, - - /** System execution pool. */ - SYSTEM_POOL, - - /** IGFS pool. */ - IGFS_POOL; - - /** Enum values. */ - private static final GridClosurePolicy[] VALS = values(); - - /** - * Efficiently gets enumerated value from its ordinal. - * - * @param ord Ordinal value. - * @return Enumerated value. - */ - @Nullable public static GridClosurePolicy fromOrdinal(int ord) { - return ord >= 0 && ord < VALS.length ? VALS[ord] : null; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index f9b74c4..a4559c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -77,6 +77,9 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.compute.ComputeJobResultPolicy.FAILOVER; import static org.apache.ignite.compute.ComputeJobResultPolicy.REDUCE; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID; @@ -87,15 +90,6 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** Ignite version in which binarylizable versions of closures were introduced. */ public static final IgniteProductVersion BINARYLIZABLE_CLOSURES_SINCE = IgniteProductVersion.fromString("1.6.0"); - /** */ - private final Executor sysPool; - - /** */ - private final Executor pubPool; - - /** */ - private final Executor igfsPool; - /** Lock to control execution after stop. */ private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); @@ -107,10 +101,6 @@ public class GridClosureProcessor extends GridProcessorAdapter { */ public GridClosureProcessor(GridKernalContext ctx) { super(ctx); - - sysPool = ctx.getSystemExecutorService(); - pubPool = ctx.getExecutorService(); - igfsPool = ctx.getIgfsExecutorService(); } /** {@inheritDoc} */ @@ -727,20 +717,8 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param plc Whether to get system or public pool. * @return Requested worker pool. */ - private Executor pool(GridClosurePolicy plc) { - switch (plc) { - case PUBLIC_POOL: - return pubPool; - - case SYSTEM_POOL: - return sysPool; - - case IGFS_POOL: - return igfsPool; - - default: - throw new IllegalArgumentException("Invalid closure execution policy: " + plc); - } + private Executor pool(byte plc) throws IgniteCheckedException { + return ctx.io().pool(plc); } /** @@ -749,7 +727,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param plc Policy to choose executor pool. * @return Pool name. */ - private String poolName(GridClosurePolicy plc) { + private String poolName(byte plc) { switch (plc) { case PUBLIC_POOL: return "public"; @@ -761,7 +739,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { return "igfs"; default: - throw new IllegalArgumentException("Invalid closure execution policy: " + plc); + return "unknown"; } } @@ -772,7 +750,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException Thrown in case of any errors. */ private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, boolean sys) throws IgniteCheckedException { - return runLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); + return runLocal(c, sys ? SYSTEM_POOL : PUBLIC_POOL); } /** @@ -781,7 +759,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @return Future. * @throws IgniteCheckedException Thrown in case of any errors. */ - private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, GridClosurePolicy plc) throws IgniteCheckedException { + public IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, byte plc) throws IgniteCheckedException { if (c == null) return new GridFinishedFuture(); @@ -857,7 +835,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @return Future. */ public IgniteInternalFuture<?> runLocalSafe(Runnable c, boolean sys) { - return runLocalSafe(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); + return runLocalSafe(c, sys ? SYSTEM_POOL : PUBLIC_POOL); } /** @@ -868,7 +846,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param plc Policy to choose executor pool. * @return Future. */ - public IgniteInternalFuture<?> runLocalSafe(Runnable c, GridClosurePolicy plc) { + public IgniteInternalFuture<?> runLocalSafe(Runnable c, byte plc) { try { return runLocal(c, plc); } @@ -912,7 +890,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException Thrown in case of any errors. */ private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, boolean sys) throws IgniteCheckedException { - return callLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); + return callLocal(c, sys ? SYSTEM_POOL : PUBLIC_POOL); } /** @@ -922,7 +900,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @return Future. * @throws IgniteCheckedException Thrown in case of any errors. */ - private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, GridClosurePolicy plc) throws IgniteCheckedException { + private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, byte plc) throws IgniteCheckedException { if (c == null) return new GridFinishedFuture<>(); @@ -996,7 +974,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @return Future. */ public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, boolean sys) { - return callLocalSafe(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); + return callLocalSafe(c, sys ? SYSTEM_POOL : PUBLIC_POOL); } /** @@ -1007,7 +985,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param plc Policy to choose executor pool. * @return Future. */ - public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, GridClosurePolicy plc) { + public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, byte plc) { try { return callLocal(c, plc); } http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index 7697a12..643cb8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -29,7 +29,6 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.lang.IgniteBiTuple; @@ -58,17 +57,6 @@ public interface GridQueryIndexing { public void stop() throws IgniteCheckedException; /** - * Runs two step query. - * - * @param cctx Cache context. - * @param qry Query. - * @param keepCacheObjects If {@code true}, cache objects representation will be preserved. - * @return Cursor. - */ - public Iterable<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, - boolean keepCacheObjects); - - /** * Parses SQL query into two step query and executes it. * * @param cctx Cache context. @@ -92,12 +80,13 @@ public interface GridQueryIndexing { * @param spaceName Space name. * @param qry Query. * @param params Query parameters. - * @param filters Space name and key filters. + * @param filter Space name and key filter. + * @param enforceJoinOrder Enforce join order of tables in the query. * @return Query result. * @throws IgniteCheckedException If failed. */ - public GridQueryFieldsResult queryFields(@Nullable String spaceName, String qry, - Collection<Object> params, IndexingQueryFilter filters) throws IgniteCheckedException; + public GridQueryFieldsResult queryLocalSqlFields(@Nullable String spaceName, String qry, + Collection<Object> params, IndexingQueryFilter filter, boolean enforceJoinOrder) throws IgniteCheckedException; /** * Executes regular query. @@ -106,12 +95,12 @@ public interface GridQueryIndexing { * @param qry Query. * @param params Query parameters. * @param type Query return type. - * @param filters Space name and key filters. + * @param filter Space name and key filter. * @return Queried rows. * @throws IgniteCheckedException If failed. */ - public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(@Nullable String spaceName, String qry, - Collection<Object> params, GridQueryTypeDescriptor type, IndexingQueryFilter filters) throws IgniteCheckedException; + public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName, String qry, + Collection<Object> params, GridQueryTypeDescriptor type, IndexingQueryFilter filter) throws IgniteCheckedException; /** * Executes text query. @@ -119,35 +108,24 @@ public interface GridQueryIndexing { * @param spaceName Space name. * @param qry Text query. * @param type Query return type. - * @param filters Space name and key filter. + * @param filter Space name and key filter. * @return Queried rows. * @throws IgniteCheckedException If failed. */ - public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryText(@Nullable String spaceName, String qry, - GridQueryTypeDescriptor type, IndexingQueryFilter filters) throws IgniteCheckedException; - - /** - * Gets size of index for given type or -1 if it is a unknown type. - * - * @param spaceName Space name. - * @param desc Type descriptor. - * @param filters Filters. - * @return Objects number. - * @throws IgniteCheckedException If failed. - */ - public long size(@Nullable String spaceName, GridQueryTypeDescriptor desc, IndexingQueryFilter filters) - throws IgniteCheckedException; + public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(@Nullable String spaceName, String qry, + GridQueryTypeDescriptor type, IndexingQueryFilter filter) throws IgniteCheckedException; /** * Registers cache. * + * @param cctx Cache context. * @param ccfg Cache configuration. * @throws IgniteCheckedException If failed. */ - public void registerCache(CacheConfiguration<?,?> ccfg) throws IgniteCheckedException; + public void registerCache(GridCacheContext<?,?> cctx, CacheConfiguration<?,?> ccfg) throws IgniteCheckedException; /** - * Deregisters cache. + * Unregisters cache. * * @param ccfg Cache configuration. * @throws IgniteCheckedException If failed to drop cache schema. @@ -228,12 +206,11 @@ public interface GridQueryIndexing { /** * Returns backup filter. * - * @param caches List of caches. * @param topVer Topology version. * @param parts Partitions. * @return Backup filter. */ - public IndexingQueryFilter backupFilter(List<String> caches, AffinityTopologyVersion topVer, int[] parts); + public IndexingQueryFilter backupFilter(AffinityTopologyVersion topVer, int[] parts); /** * Client disconnected callback. http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index a42eb98..04c6cb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -28,6 +28,7 @@ import org.apache.ignite.cache.CacheTypeMetadata; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.cache.QueryIndexType; +import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; @@ -42,11 +43,11 @@ import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; -import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -191,11 +192,13 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * @param ccfg Cache configuration. + * @param cctx Cache context. * @throws IgniteCheckedException If failed. */ - public void initializeCache(CacheConfiguration<?, ?> ccfg) throws IgniteCheckedException { - idx.registerCache(ccfg); + private void initializeCache(GridCacheContext<?, ?> cctx) throws IgniteCheckedException { + CacheConfiguration<?,?> ccfg = cctx.config(); + + idx.registerCache(cctx, cctx.config()); try { List<Class<?>> mustDeserializeClss = null; @@ -275,10 +278,28 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (valCls != null) altTypeId = new TypeId(ccfg.getName(), valCls); + + if (!cctx.customAffinityMapper() && qryEntity.getKeyType() != null) { + // Need to setup affinity key for distributed joins. + String affField = ctx.cacheObjects().affinityField(qryEntity.getKeyType()); + + if (affField != null) + desc.affinityKey(affField); + } } else { processClassMeta(qryEntity, desc, coCtx); + AffinityKeyMapper keyMapper = cctx.config().getAffinityMapper(); + + if (keyMapper instanceof GridCacheDefaultAffinityKeyMapper) { + String affField = + ((GridCacheDefaultAffinityKeyMapper)keyMapper).affinityKeyPropertyName(desc.keyCls); + + if (affField != null) + desc.affinityKey(affField); + } + typeId = new TypeId(ccfg.getName(), valCls); altTypeId = new TypeId(ccfg.getName(), ctx.cacheObjects().typeId(qryEntity.getValueType())); } @@ -290,7 +311,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { types.put(altTypeId, desc); desc.registered(idx.registerType(ccfg.getName(), desc)); - } } @@ -460,7 +480,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { return; try { - initializeCache(cctx.config()); + initializeCache(cctx); } finally { busyLock.leaveBusy(); @@ -501,33 +521,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * Returns number of objects of given type for given space of spi. - * - * @param space Space. - * @param valType Value type. - * @return Objects number or -1 if this type is unknown for given SPI and space. - * @throws IgniteCheckedException If failed. - */ - public long size(@Nullable String space, Class<?> valType) throws IgniteCheckedException { - checkEnabled(); - - if (!busyLock.enterBusy()) - throw new IllegalStateException("Failed to get space size (grid is stopping)."); - - try { - TypeDescriptor desc = types.get(new TypeId(space, valType)); - - if (desc == null || !desc.registered()) - return -1; - - return idx.size(space, desc, null); - } - finally { - busyLock.leaveBusy(); - } - } - - /** * Rebuilds all search indexes of given value type for given space of spi. * * @param space Space. @@ -657,7 +650,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { return; if (!busyLock.enterBusy()) - throw new IllegalStateException("Failed to write to index (grid is stopping)."); + return; try { if (coctx == null) @@ -723,74 +716,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * @param space Space. - * @param clause Clause. - * @param params Parameters collection. - * @param resType Result type. - * @param filters Filters. - * @return Key/value rows. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(final String space, final String clause, - final Collection<Object> params, final String resType, final IndexingQueryFilter filters) - throws IgniteCheckedException { - checkEnabled(); - - if (!busyLock.enterBusy()) - throw new IllegalStateException("Failed to execute query (grid is stopping)."); - - try { - final GridCacheContext<?, ?> cctx = ctx.cache().internalCache(space).context(); - - return executeQuery(cctx, new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() { - @Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException { - TypeDescriptor type = typesByName.get(new TypeName(space, resType)); - - if (type == null || !type.registered()) - throw new CacheException("Failed to find SQL table for type: " + resType); - - return idx.query(space, clause, params, type, filters); - } - }, false); - } - finally { - busyLock.leaveBusy(); - } - } - - /** - * @param space Space name. - * @param qry Query. - * @return Cursor. - */ - public Iterable<List<?>> queryTwoStep(String space, final GridCacheTwoStepQuery qry) { - checkxEnabled(); - - if (!busyLock.enterBusy()) - throw new IllegalStateException("Failed to execute query (grid is stopping)."); - - try { - final GridCacheContext<Object, Object> cctx = ctx.cache().internalCache(space).context(); - - return executeQuery(cctx, new IgniteOutClosureX<Iterable<List<?>>>() { - @Override public Iterable<List<?>> applyx() throws IgniteCheckedException { - return idx.queryTwoStep( - cctx, - qry, - cctx.keepBinary()); - } - }, false); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - finally { - busyLock.leaveBusy(); - } - } - - /** * @param cctx Cache context. * @param qry Query. * @return Cursor. @@ -845,6 +770,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** * @param cctx Cache context. * @param qry Query. + * @param keepBinary Keep binary flag. * @return Cursor. */ public <K, V> Iterator<Cache.Entry<K, V>> queryLocal( @@ -873,12 +799,12 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (typeDesc == null || !typeDesc.registered()) throw new CacheException("Failed to find SQL table for type: " + type); - final GridCloseableIterator<IgniteBiTuple<K, V>> i = idx.query( + final GridCloseableIterator<IgniteBiTuple<K, V>> i = idx.queryLocalSql( space, sqlQry, F.asList(params), typeDesc, - idx.backupFilter(null, null, null)); + idx.backupFilter(null, null)); sendQueryExecutedEvent( sqlQry, @@ -963,8 +889,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { String sql = qry.getSql(); Object[] args = qry.getArgs(); - final GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), - idx.backupFilter(null, null, null)); + final GridQueryFieldsResult res = idx.queryLocalSqlFields(space, sql, F.asList(args), + idx.backupFilter(null, null), qry.isEnforceJoinOrder()); sendQueryExecutedEvent(sql, args); @@ -1118,7 +1044,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (type == null || !type.registered()) throw new CacheException("Failed to find SQL table for type: " + resType); - return idx.queryText( + return idx.queryLocalText( space, clause, type, @@ -1132,35 +1058,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * @param space Space name. - * @param clause Clause. - * @param params Parameters collection. - * @param filters Key and value filters. - * @return Field rows. - * @throws IgniteCheckedException If failed. - */ - public GridQueryFieldsResult queryFields(@Nullable final String space, final String clause, - final Collection<Object> params, final IndexingQueryFilter filters) throws IgniteCheckedException { - checkEnabled(); - - if (!busyLock.enterBusy()) - throw new IllegalStateException("Failed to execute query (grid is stopping)."); - - try { - final GridCacheContext<?, ?> cctx = ctx.cache().internalCache(space).context(); - - return executeQuery(cctx, new IgniteOutClosureX<GridQueryFieldsResult>() { - @Override public GridQueryFieldsResult applyx() throws IgniteCheckedException { - return idx.queryFields(space, clause, params, filters); - } - }, false); - } - finally { - busyLock.leaveBusy(); - } - } - - /** * Will be called when entry for key will be swapped. * * @param spaceName Space name. @@ -1523,7 +1420,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { aliases, coCtx); - d.addProperty(prop, false); } @@ -1770,7 +1666,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { res = clo.apply(); if (res instanceof CacheQueryFuture) { - CacheQueryFuture fut = (CacheQueryFuture) res; + CacheQueryFuture fut = (CacheQueryFuture)res; err = fut.error(); } @@ -1782,6 +1678,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw (IgniteCheckedException)err; } + catch (CacheException e) { + err = e; + + throw e; + } catch (Exception e) { err = e; @@ -2096,6 +1997,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** */ private boolean valTextIdx; + /** */ + private String affKey; + /** SPI can decide not to register this type. */ private boolean registered; @@ -2265,6 +2169,18 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ + @Override public String affinityKey() { + return affKey; + } + + /** + * @param affKey Affinity key field. + */ + void affinityKey(String affKey) { + this.affKey = affKey; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TypeDescriptor.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java index 45919ef..b636841 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java @@ -82,4 +82,11 @@ public interface GridQueryTypeDescriptor { * @return If string representation of value should be full-text indexed. */ public boolean valueTextIndex(); + + /** + * Returns affinity key field name or {@code null} for default. + * + * @return Affinity key. + */ + public String affinityKey(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java index ecc0abd..6706ab9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java @@ -117,4 +117,4 @@ public class GridQueryCancelRequest implements Message { @Override public byte fieldsCount() { return 1; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java index 499438d..bd9f7d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java @@ -143,4 +143,4 @@ public class GridQueryFailResponse implements Message { @Override public byte fieldsCount() { return 2; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java index 84cb57e..1feff5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java @@ -170,4 +170,4 @@ public class GridQueryNextPageRequest implements Message { @Override public byte fieldsCount() { return 3; } -} \ No newline at end of file +}
