Repository: ignite Updated Branches: refs/heads/ignite-4565-ddl a4d01a632 -> 318ddedaf
WIP. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9622039a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9622039a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9622039a Branch: refs/heads/ignite-4565-ddl Commit: 9622039a540957ef73f558dbc99da0dbe88f38da Parents: a4d01a6 Author: devozerov <[email protected]> Authored: Thu Mar 9 17:28:14 2017 +0300 Committer: devozerov <[email protected]> Committed: Thu Mar 9 17:28:14 2017 +0300 ---------------------------------------------------------------------- .../processors/query/GridQueryProcessor.java | 39 +++++------ .../processors/query/QueryIndexHandler.java | 68 ++++++++++++++++++-- .../query/QueryTypeDescriptorImpl.java | 53 +++------------ 3 files changed, 94 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9622039a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index f2def38..cf3cbbe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -144,6 +144,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** */ private final GridQueryIndexing idx; + /** Index handler. */ + private final QueryIndexHandler idxHnd; + /** */ private GridTimeoutProcessor.CancelableTask qryDetailMetricsEvictTask; @@ -163,6 +166,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { } else idx = INDEXING.inClassPath() ? U.<GridQueryIndexing>newInstance(INDEXING.className()) : null; + + idxHnd = new QueryIndexHandler(ctx); } /** {@inheritDoc} */ @@ -223,6 +228,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { idx.registerCache(cctx, cctx.config()); try { + Collection<QueryTypeDescriptorImpl> typeDescs = new ArrayList<>(); + List<Class<?>> mustDeserializeClss = null; boolean binaryEnabled = ctx.cacheObjects().isBinaryEnabled(ccfg); @@ -234,9 +241,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (F.isEmpty(qryEntity.getValueType())) throw new IgniteCheckedException("Value type is not set: " + qryEntity); - QueryTypeDescriptorImpl desc = new QueryTypeDescriptorImpl(); - - desc.space(cctx.name()); + QueryTypeDescriptorImpl desc = new QueryTypeDescriptorImpl(cctx.name()); // Key and value classes still can be available if they are primitive or JDK part. // We need that to set correct types for _key and _val columns. @@ -333,8 +338,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { altTypeId = new QueryTypeIdKey(ccfg.getName(), ctx.cacheObjects().typeId(qryEntity.getValueType())); } - desc.onInitialStateReady(); - addTypeByName(ccfg, desc); types.put(typeId, desc); @@ -342,6 +345,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { types.put(altTypeId, desc); desc.registered(idx.registerType(ccfg.getName(), desc)); + + typeDescs.add(desc); } } @@ -354,9 +359,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { meta.getDescendingFields().isEmpty() && meta.getGroups().isEmpty()) continue; - QueryTypeDescriptorImpl desc = new QueryTypeDescriptorImpl(); - - desc.space(cctx.name()); + QueryTypeDescriptorImpl desc = new QueryTypeDescriptorImpl(cctx.name()); // Key and value classes still can be available if they are primitive or JDK part. // We need that to set correct types for _key and _val columns. @@ -428,8 +431,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { altTypeId = new QueryTypeIdKey(ccfg.getName(), ctx.cacheObjects().typeId(meta.getValueType())); } - desc.onInitialStateReady(); - addTypeByName(ccfg, desc); types.put(typeId, desc); @@ -437,10 +438,12 @@ public class GridQueryProcessor extends GridProcessorAdapter { types.put(altTypeId, desc); desc.registered(idx.registerType(ccfg.getName(), desc)); + + typeDescs.add(desc); } } - // Indexed types must be translated to CacheTypeMetadata in CacheConfiguration. + idxHnd.onCacheCreated(cctx.name(), typeDescs); if (mustDeserializeClss != null) { U.warn(log, "Some classes in query configuration cannot be written in binary format " + @@ -517,6 +520,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { + idxHnd.onDisconnected(); + if (idx != null) idx.onDisconnected(reconnectFut); } @@ -551,6 +556,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { return; try { + idxHnd.onCacheStopped(cctx.name()); + idx.unregisterCache(cctx.config()); Iterator<Map.Entry<QueryTypeIdKey, QueryTypeDescriptorImpl>> it = types.entrySet().iterator(); @@ -959,13 +966,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @return Future completed when index is created. */ public IgniteInternalFuture<?> createIndex(String space, String tblName, QueryIndex idx, boolean ifNotExists) { - for (QueryTypeDescriptorImpl desc : types.values()) { - if (desc.matchSpaceAndTable(space, tblName)) - return desc.dynamicIndexCreate(idx, ifNotExists); - } - - return new GridFinishedFuture<>(new IgniteException("Failed to create index becase table is not found [" + - "space=" + space + ", table=" + tblName + ']')); + return idxHnd.onCreateIndex(space, tblName, idx, ifNotExists); } /** @@ -1297,6 +1298,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (ldr.equals(U.detectClassLoader(desc.valueClass())) || ldr.equals(U.detectClassLoader(desc.keyClass()))) { + idxHnd.onTypeUnregistered(desc); + idx.unregisterType(e.getKey().space(), desc); it.remove(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9622039a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexHandler.java index 7585dbb..f114d14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexHandler.java @@ -1,12 +1,15 @@ package org.apache.ignite.internal.processors.query; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import java.util.Collection; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -17,6 +20,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * Index state manager. */ public class QueryIndexHandler { + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Logger. */ + private final IgniteLogger log; + /** Indexes. */ private final Map<String, QueryIndexDescriptorImpl> idxs = new ConcurrentHashMap<>(); @@ -27,12 +36,50 @@ public class QueryIndexHandler { private final ReadWriteLock lock = new ReentrantReadWriteLock(); /** - * Handle initial index state. + * Constructor. + * + * @param ctx Kernal context. + */ + public QueryIndexHandler(GridKernalContext ctx) { + this.ctx = ctx; + + log = ctx.log(QueryIndexHandler.class); + } + + /** + * Handle cache creation. + * + * @param cacheName Cache name. + * @param typDescs Type descriptors. + */ + public void onCacheCreated(String cacheName, Collection<QueryTypeDescriptorImpl> typDescs) { + // TODO: Make sure indexes are unique. +// this.idxs.put(typ.indexes()); + } + + /** + * Handle cache stop. + * + * @param cacheName Cache name. + */ + public void onCacheStopped(String cacheName) { + // TODO + } + + /** + * Handle type unregister. * - * @param idxs Indexes. + * @param desc Descriptor. */ - public void onInitialStateReady(Map<String, QueryIndexDescriptorImpl> idxs) { - this.idxs.putAll(idxs); + public void onTypeUnregistered(QueryTypeDescriptorImpl desc) { + // TODO + } + + /** + * Handle disconnect. + */ + public void onDisconnected() { + // TODO } /** @@ -42,7 +89,18 @@ public class QueryIndexHandler { * @param ifNotExists IF-NOT-EXISTS flag. * @return Future completed when index is created. */ - public IgniteInternalFuture<?> onCreateIndex(QueryIndex idx, boolean ifNotExists) { + public IgniteInternalFuture<?> onCreateIndex(String cacheName, String tblName, QueryIndex idx, + boolean ifNotExists) { + // TODO: Integrated from previous impl: +// for (QueryTypeDescriptorImpl desc : types.values()) { +// if (desc.matchCacheAndTable(space, tblName)) +// return desc.dynamicIndexCreate(idx, ifNotExists); +// } +// +// return new GridFinishedFuture<>(new IgniteException("Failed to create index becase table is not found [" + +// "space=" + space + ", table=" + tblName + ']')); + + lock.writeLock().lock(); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/9622039a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java index 5b8efcc..983a6f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java @@ -18,12 +18,9 @@ package org.apache.ignite.internal.processors.query; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.cache.QueryIndexType; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; @@ -36,8 +33,8 @@ import java.util.Map; * Descriptor of type. */ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor { - /** Space. */ - private String space; + /** Cache name. */ + private final String cacheName; /** */ private String name; @@ -60,9 +57,6 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor { @GridToStringInclude private final Map<String, QueryIndexDescriptorImpl> indexes = new HashMap<>(); - /** Index state manager. */ - private final QueryIndexHandler idxState = new QueryIndexHandler(); - /** */ private QueryIndexDescriptorImpl fullTextIdx; @@ -88,17 +82,19 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor { private boolean registered; /** - * @return Space. + * Constructor. + * + * @param cacheName Cache name. */ - public String space() { - return space; + public QueryTypeDescriptorImpl(String cacheName) { + this.cacheName = cacheName; } /** - * @param space Space. + * @return Cache name. */ - public void space(String space) { - this.space = space; + public String cacheName() { + return cacheName; } /** @@ -354,35 +350,6 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor { this.affKey = affKey; } - /** - * Check whether space and table name matches. - * - * @param space Space. - * @param tblName Table name. - * @return {@code True} if matches. - */ - public boolean matchSpaceAndTable(String space, String tblName) { - return F.eq(space, this.space) && F.eq(tblName, this.tblName); - } - - /** - * Callback invoked when initial type state is ready. - */ - public void onInitialStateReady() { - idxState.onInitialStateReady(indexes); - } - - /** - * Initiate asynchronous index creation. - * - * @param idx Index description. - * @param ifNotExists When set to {@code true} operation will fail if index already exists. - * @return Future completed when index is created. - */ - public IgniteInternalFuture<?> dynamicIndexCreate(QueryIndex idx, boolean ifNotExists) { - return idxState.onCreateIndex(idx, ifNotExists); - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(QueryTypeDescriptorImpl.class, this);
