http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 2b957be..ceb139a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -17,34 +17,29 @@
 
 package org.apache.ignite.internal.processors.query;
 
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-import javax.cache.Cache;
-import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -54,7 +49,23 @@ import 
org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import 
org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
+import 
org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
+import 
org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
+import org.apache.ignite.internal.processors.query.schema.SchemaKey;
+import 
org.apache.ignite.internal.processors.query.schema.SchemaOperationClientFuture;
+import 
org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
+import 
org.apache.ignite.internal.processors.query.schema.SchemaOperationManager;
+import 
org.apache.ignite.internal.processors.query.schema.SchemaOperationWorker;
+import 
org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage;
+import 
org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
+import 
org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage;
+import 
org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
+import 
org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import 
org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexCreateOperation;
+import 
org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexDropOperation;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -63,17 +74,45 @@ import 
org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.internal.util.worker.GridWorkerFuture;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
+
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
+import static org.apache.ignite.internal.GridTopic.TOPIC_SCHEMA;
 import static org.apache.ignite.internal.IgniteComponentType.INDEXING;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SCHEMA_POOL;
 
 /**
  * Indexing processor.
@@ -82,26 +121,70 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     /** Queries detail metrics eviction frequency. */
     private static final int QRY_DETAIL_METRICS_EVICTION_FREQ = 3_000;
 
+    /** */
+    private static final ThreadLocal<AffinityTopologyVersion> requestTopVer = 
new ThreadLocal<>();
+
     /** For tests. */
     public static Class<? extends GridQueryIndexing> idxCls;
 
+    /** JDK marshaller to serialize errors. */
+    private final JdkMarshaller marsh = new JdkMarshaller();
+
     /** */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
+    /** */
+    private GridTimeoutProcessor.CancelableTask qryDetailMetricsEvictTask;
+
     /** Type descriptors. */
-    private final Map<QueryTypeIdKey, QueryTypeDescriptorImpl> types = new 
ConcurrentHashMap8<>();
+    private final Map<QueryTypeIdKey, QueryTypeDescriptorImpl> types = new 
ConcurrentHashMap<>();
 
     /** Type descriptors. */
-    private final ConcurrentMap<QueryTypeNameKey, QueryTypeDescriptorImpl> 
typesByName = new ConcurrentHashMap8<>();
+    private final ConcurrentMap<QueryTypeNameKey, QueryTypeDescriptorImpl> 
typesByName = new ConcurrentHashMap<>();
 
     /** */
     private final GridQueryIndexing idx;
 
-    /** */
-    private GridTimeoutProcessor.CancelableTask qryDetailMetricsEvictTask;
+    /** All indexes. */
+    private final ConcurrentMap<QueryIndexKey, QueryIndexDescriptorImpl> idxs 
= new ConcurrentHashMap<>();
 
-    /** */
-    private static final ThreadLocal<AffinityTopologyVersion> requestTopVer = 
new ThreadLocal<>();
+    /** Schema operation futures created on client side. */
+    private final ConcurrentMap<UUID, SchemaOperationClientFuture> 
schemaCliFuts = new ConcurrentHashMap<>();
+
+    /** IO message listener. */
+    private final GridMessageListener ioLsnr;
+
+    /** Schema operations. */
+    private final ConcurrentHashMap<SchemaKey, SchemaOperation> schemaOps = 
new ConcurrentHashMap<>();
+
+    /** Active propose messages. */
+    private final LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> 
activeProposals = new LinkedHashMap<>();
+
+    /** General state mutex. */
+    private final Object stateMux = new Object();
+
+    /** Coordinator node (initialized lazily). */
+    private ClusterNode crd;
+
+    /** Registered spaces. */
+    private final Collection<String> spaces = Collections.newSetFromMap(new 
ConcurrentHashMap<String, Boolean>());
+
+    /** ID history for index create/drop discovery messages. */
+    private final GridBoundedConcurrentLinkedHashSet<IgniteUuid> dscoMsgIdHist 
=
+        new 
GridBoundedConcurrentLinkedHashSet<>(QueryUtils.discoveryHistorySize());
+
+    /** History of already completed operations. */
+    private final GridBoundedConcurrentLinkedHashSet<UUID> completedOpIds =
+        new 
GridBoundedConcurrentLinkedHashSet<>(QueryUtils.discoveryHistorySize());
+
+    /** Pending status messages. */
+    private final LinkedList<SchemaOperationStatusMessage> pendingMsgs = new 
LinkedList<>();
+
+    /** Disconnected flag. */
+    private boolean disconnected;
+
+    /** Whether exchange thread is ready to process further requests. */
+    private boolean exchangeReady;
 
     /** */
     private boolean skipFieldLookup;
@@ -119,6 +202,20 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
         }
         else
             idx = INDEXING.inClassPath() ? 
U.<GridQueryIndexing>newInstance(INDEXING.className()) : null;
+
+        ioLsnr = new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                if (msg instanceof SchemaOperationStatusMessage) {
+                    SchemaOperationStatusMessage msg0 = 
(SchemaOperationStatusMessage)msg;
+
+                    msg0.senderNodeId(nodeId);
+
+                    processStatusMessage(msg0);
+                }
+                else
+                    U.warn(log, "Unsupported IO message: " + msg);
+            }
+        };
     }
 
     /** {@inheritDoc} */
@@ -131,6 +228,8 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
             idx.start(ctx, busyLock);
         }
 
+        ctx.io().addMessageListener(TOPIC_SCHEMA, ioLsnr);
+
         // Schedule queries detail metrics eviction.
         qryDetailMetricsEvictTask = ctx.timeout().schedule(new Runnable() {
             @Override public void run() {
@@ -140,6 +239,401 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
         }, QRY_DETAIL_METRICS_EVICTION_FREQ, QRY_DETAIL_METRICS_EVICTION_FREQ);
     }
 
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        super.onKernalStop(cancel);
+
+        if (cancel && idx != null) {
+            try {
+                while (!busyLock.tryBlock(500))
+                    idx.cancelAllQueries();
+
+                return;
+            } catch (InterruptedException ignored) {
+                U.warn(log, "Interrupted while waiting for active queries 
cancellation.");
+
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        busyLock.block();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) throws IgniteCheckedException {
+        super.stop(cancel);
+
+        ctx.io().removeMessageListener(TOPIC_SCHEMA, ioLsnr);
+
+        if (idx != null)
+            idx.stop();
+
+        U.closeQuiet(qryDetailMetricsEvictTask);
+    }
+
+    /**
+     * Handle cache kernal start. At this point discovery and IO managers are 
operational, caches are not started yet.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onCacheKernalStart() throws IgniteCheckedException {
+        synchronized (stateMux) {
+            exchangeReady = true;
+
+            // Re-run pending top-level proposals.
+            for (SchemaOperation schemaOp : schemaOps.values())
+                onSchemaPropose(schemaOp.proposeMessage());
+        }
+    }
+
+    /**
+     * Handle cache reconnect.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onCacheReconnect() throws IgniteCheckedException {
+        synchronized (stateMux) {
+            assert disconnected;
+
+            disconnected = false;
+
+            onCacheKernalStart();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+        return DiscoveryDataExchangeType.QUERY_PROC;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        // Collect active proposals.
+        synchronized (stateMux) {
+            LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> data = new 
LinkedHashMap<>(activeProposals);
+
+            
dataBag.addGridCommonData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), data);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void 
onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+        synchronized (stateMux) {
+            // Preserve proposals.
+            LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> data0 =
+                (LinkedHashMap<UUID, 
SchemaProposeDiscoveryMessage>)data.commonData();
+
+            // Process proposals as if they were received as regular discovery 
messages.
+            if (data0 != null) {
+                for (SchemaProposeDiscoveryMessage activeProposal : 
data0.values())
+                    onSchemaProposeDiscovery0(activeProposal);
+            }
+        }
+    }
+
+    /**
+     * Process schema propose message from discovery thread.
+     *
+     * @param msg Message.
+     * @return {@code True} if exchange should be triggered.
+     */
+    private boolean onSchemaProposeDiscovery(SchemaProposeDiscoveryMessage 
msg) {
+        UUID opId = msg.operation().id();
+        String space = msg.operation().space();
+
+        if (!msg.initialized()) {
+            // Ensure cache exists on coordinator node.
+            DynamicCacheDescriptor cacheDesc = 
ctx.cache().cacheDescriptor(space);
+
+            if (cacheDesc == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Received schema propose discovery message, but 
cache doesn't exist " +
+                        "(will report error) [opId=" + opId + ", msg=" + msg + 
']');
+
+                msg.onError(new 
SchemaOperationException(SchemaOperationException.CODE_CACHE_NOT_FOUND, space));
+            }
+            else {
+                CacheConfiguration ccfg = cacheDesc.cacheConfiguration();
+
+                if (ccfg.getCacheMode() == CacheMode.LOCAL) {
+                    // Distributed operation is not allowed on LOCAL caches.
+                    if (log.isDebugEnabled())
+                        log.debug("Received schema propose discovery message, 
but cache is LOCAL " +
+                            "(will report error) [opId=" + opId + ", msg=" + 
msg + ']');
+
+                    msg.onError(new SchemaOperationException("Schema changes 
are not supported for LOCAL cache."));
+                }
+                else {
+                    // Preserve deployment ID so that we can distinguish 
between different caches with the same name.
+                    if (msg.deploymentId() == null)
+                        msg.deploymentId(cacheDesc.deploymentId());
+
+                    assert F.eq(cacheDesc.deploymentId(), msg.deploymentId());
+                }
+            }
+        }
+
+        // Complete client future and exit immediately in case of error.
+        if (msg.hasError()) {
+            SchemaOperationClientFuture cliFut = schemaCliFuts.remove(opId);
+
+            if (cliFut != null)
+                cliFut.onDone(msg.error());
+
+            return false;
+        }
+
+        return onSchemaProposeDiscovery0(msg);
+    }
+
+    /**
+     * Process schema propose message from discovery thread (or from cache 
start routine).
+     *
+     * @param msg Message.
+     * @return {@code True} if exchange should be triggered.
+     */
+    private boolean onSchemaProposeDiscovery0(SchemaProposeDiscoveryMessage 
msg) {
+        UUID opId = msg.operation().id();
+
+        synchronized (stateMux) {
+            if (disconnected) {
+                if (log.isDebugEnabled())
+                    log.debug("Processing discovery schema propose message, 
but node is disconnected (will ignore) " +
+                        "[opId=" + opId + ", msg=" + msg + ']');
+
+                return false;
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Processing discovery schema propose message [opId=" 
+ opId + ", msg=" + msg + ']');
+
+            // Put message to active operations set.
+            SchemaProposeDiscoveryMessage oldDesc = 
activeProposals.put(msg.operation().id(), msg);
+
+            assert oldDesc == null;
+
+            // Create schema operation and either trigger it immediately from 
exchange thread or append to already
+            // running operation.
+            SchemaOperation schemaOp = new SchemaOperation(msg);
+
+            SchemaKey key = msg.schemaKey();
+
+            SchemaOperation prevSchemaOp = schemaOps.get(key);
+
+            if (prevSchemaOp != null) {
+                prevSchemaOp = prevSchemaOp.unwind();
+
+                if (log.isDebugEnabled())
+                    log.debug("Schema change is enqueued and will be executed 
after previous operation is completed " +
+                        "[opId=" + opId + ", prevOpId=" + prevSchemaOp.id() + 
']');
+
+                prevSchemaOp.next(schemaOp);
+
+                return false;
+            }
+            else {
+                schemaOps.put(key, schemaOp);
+
+                return exchangeReady;
+            }
+        }
+    }
+
+    /**
+     * Handle schema propose from exchange thread.
+     *
+     * @param msg Discovery message.
+     */
+    @SuppressWarnings("ThrowableInstanceNeverThrown")
+    public void onSchemaPropose(SchemaProposeDiscoveryMessage msg) {
+        UUID opId = msg.operation().id();
+
+        if (log.isDebugEnabled())
+            log.debug("Processing schema propose message (exchange) [opId=" + 
opId + ']');
+
+        synchronized (stateMux) {
+            if (disconnected)
+                return;
+
+            SchemaOperation curOp = schemaOps.get(msg.schemaKey());
+
+            assert curOp != null;
+            assert F.eq(opId, curOp.id());
+            assert !curOp.started();
+
+            startSchemaChange(curOp);
+        }
+    }
+
+    /**
+     * Process schema finish message from discovery thread.
+     *
+     * @param msg Message.
+     */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    private void onSchemaFinishDiscovery(SchemaFinishDiscoveryMessage msg) {
+        UUID opId = msg.operation().id();
+
+        if (log.isDebugEnabled())
+            log.debug("Received schema finish message (discovery) [opId=" + 
opId + ", msg=" + msg + ']');
+
+        synchronized (stateMux) {
+            if (disconnected)
+                return;
+
+            boolean completedOpAdded = completedOpIds.add(opId);
+
+            assert completedOpAdded;
+
+            // Remove propose message so that it will not be shared with 
joining nodes.
+            SchemaProposeDiscoveryMessage proposeMsg = 
activeProposals.remove(opId);
+
+            assert proposeMsg != null;
+
+            // Apply changes to public cache schema if operation is successful 
and original cache is still there.
+            if (!msg.hasError()) {
+                DynamicCacheDescriptor cacheDesc = 
ctx.cache().cacheDescriptor(msg.operation().space());
+
+                if (cacheDesc != null && F.eq(cacheDesc.deploymentId(), 
proposeMsg.deploymentId()))
+                    cacheDesc.schemaChangeFinish(msg);
+            }
+
+            // Propose message will be used from exchange thread to
+            msg.proposeMessage(proposeMsg);
+
+            if (exchangeReady) {
+                SchemaOperation op = schemaOps.get(proposeMsg.schemaKey());
+
+                if (F.eq(op.id(), opId)) {
+                    // Completed top operation.
+                    op.finishMessage(msg);
+
+                    if (op.started())
+                        op.doFinish();
+                }
+                else {
+                    // Completed operation in the middle, will schedule 
completion later.
+                    while (op != null) {
+                        if (F.eq(op.id(), opId))
+                            break;
+
+                        op = op.next();
+                    }
+
+                    assert op != null;
+                    assert !op.started();
+
+                    op.finishMessage(msg);
+                }
+            }
+            else {
+                // Set next operation as top-level one.
+                SchemaKey schemaKey = proposeMsg.schemaKey();
+
+                SchemaOperation op = schemaOps.remove(schemaKey);
+
+                assert op != null;
+                assert F.eq(op.id(), opId);
+
+                // Chain to the next operation (if any).
+                SchemaOperation nextOp = op.next();
+
+                if (nextOp != null)
+                    schemaOps.put(schemaKey, nextOp);
+            }
+
+            // Clean stale IO messages from just-joined nodes.
+            cleanStaleStatusMessages(opId);
+        }
+
+        // Complete client future (if any).
+        SchemaOperationClientFuture cliFut = schemaCliFuts.remove(opId);
+
+        if (cliFut != null) {
+            if (msg.hasError())
+                cliFut.onDone(msg.error());
+            else
+                cliFut.onDone();
+        }
+    }
+
+    /**
+     * Initiate actual schema change operation.
+     *
+     * @param schemaOp Schema operation.
+     */
+    @SuppressWarnings({"unchecked", "ThrowableInstanceNeverThrown"})
+    private void startSchemaChange(SchemaOperation schemaOp) {
+        assert Thread.holdsLock(stateMux);
+        assert !schemaOp.started();
+
+        // Get current cache state.
+        SchemaProposeDiscoveryMessage msg = schemaOp.proposeMessage();
+
+        String space = msg.operation().space();
+
+        DynamicCacheDescriptor cacheDesc = ctx.cache().cacheDescriptor(space);
+
+        boolean cacheExists = cacheDesc != null && F.eq(msg.deploymentId(), 
cacheDesc.deploymentId());
+
+        boolean cacheRegistered = cacheExists && 
spaces.contains(CU.mask(space));
+
+        // Validate schema state and decide whether we should proceed or not.
+        SchemaAbstractOperation op = msg.operation();
+
+        QueryTypeDescriptorImpl type = null;
+        SchemaOperationException err;
+
+        boolean nop = false;
+
+        if (cacheExists) {
+            if (cacheRegistered) {
+                // If cache is started, we perform validation against real 
schema.
+                T3<QueryTypeDescriptorImpl, Boolean, SchemaOperationException> 
res = prepareChangeOnStartedCache(op);
+
+                assert res.get2() != null;
+
+                type = res.get1();
+                nop = res.get2();
+                err = res.get3();
+            }
+            else {
+                // If cache is not started yet, there is no schema. Take 
schema from cache descriptor and validate.
+                QuerySchema schema = cacheDesc.schema();
+
+                T2<Boolean, SchemaOperationException> res = 
prepareChangeOnNotStartedCache(op, schema);
+
+                assert res.get1() != null;
+
+                type = null;
+                nop = res.get1();
+                err = res.get2();
+            }
+        }
+        else
+            err = new 
SchemaOperationException(SchemaOperationException.CODE_CACHE_NOT_FOUND, 
op.space());
+
+        // Start operation.
+        SchemaOperationWorker worker =
+            new SchemaOperationWorker(ctx, this, msg.deploymentId(), op, nop, 
err, cacheRegistered, type);
+
+        SchemaOperationManager mgr = new SchemaOperationManager(ctx, this, 
worker,
+            ctx.clientNode() ? null : coordinator());
+
+        schemaOp.manager(mgr);
+
+        mgr.start();
+
+        // Unwind pending IO messages.
+        if (!ctx.clientNode() && coordinator().isLocal())
+            unwindPendingMessages(schemaOp.id(), mgr);
+
+        // Schedule operation finish handling if needed.
+        if (schemaOp.hasFinishMessage())
+            schemaOp.doFinish();
+    }
+
     /**
      * @return {@code true} If indexing module is in classpath and 
successfully initialized.
      */
@@ -148,55 +642,115 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * @return Indexing.
+     * @throws IgniteException If module is not enabled.
+     */
+    public GridQueryIndexing getIndexing() throws IgniteException {
+        checkxEnabled();
+
+        return idx;
+    }
+
+    /**
      * @param cctx Cache context.
+     * @param schema Initial schema.
      * @throws IgniteCheckedException If failed.
      */
-    @SuppressWarnings("deprecation")
-    private void initializeCache(GridCacheContext<?, ?> cctx) throws 
IgniteCheckedException {
+    @SuppressWarnings({"deprecation", "ThrowableResultOfMethodCallIgnored"})
+    private void initializeCache(GridCacheContext<?, ?> cctx, QuerySchema 
schema) throws IgniteCheckedException {
         String space = cctx.name();
 
-        CacheConfiguration<?,?> ccfg = cctx.config();
-
         // Prepare candidates.
         List<Class<?>> mustDeserializeClss = new ArrayList<>();
 
         Collection<QueryTypeCandidate> cands = new ArrayList<>();
 
-        if (!F.isEmpty(ccfg.getQueryEntities())) {
-            for (QueryEntity qryEntity : ccfg.getQueryEntities()) {
+        Collection<QueryEntity> qryEntities = schema.entities();
+
+        if (!F.isEmpty(qryEntities)) {
+            for (QueryEntity qryEntity : qryEntities) {
                 QueryTypeCandidate cand = QueryUtils.typeForQueryEntity(space, 
cctx, qryEntity, mustDeserializeClss);
 
                 cands.add(cand);
             }
         }
 
-        // Register candidates.
-        idx.registerCache(space, cctx, cctx.config());
+        // Ensure that candidates has unique index names. Otherwise we will 
not be able to apply pending operations.
+        Map<String, QueryTypeDescriptorImpl> tblTypMap = new HashMap<>();
+        Map<String, QueryTypeDescriptorImpl> idxTypMap = new HashMap<>();
 
-        try {
-            for (QueryTypeCandidate cand : cands) {
-                QueryTypeIdKey typeId = cand.typeId();
-                QueryTypeIdKey altTypeId = cand.alternativeTypeId();
-                QueryTypeDescriptorImpl desc = cand.descriptor();
+        for (QueryTypeCandidate cand : cands) {
+            QueryTypeDescriptorImpl desc = cand.descriptor();
 
-                if (typesByName.putIfAbsent(new QueryTypeNameKey(space, 
desc.name()), desc) != null)
-                    throw new IgniteCheckedException("Type with name '" + 
desc.name() + "' already indexed " +
-                        "in cache '" + space + "'.");
+            QueryTypeDescriptorImpl oldDesc = tblTypMap.put(desc.tableName(), 
desc);
 
-                types.put(typeId, desc);
+            if (oldDesc != null)
+                throw new IgniteException("Duplicate table name [tblName=" + 
desc.tableName() +
+                    ", type1=" + desc.name() + ", type2=" + oldDesc.name() + 
']');
 
-                if (altTypeId != null)
-                    types.put(altTypeId, desc);
+            for (String idxName : desc.indexes().keySet()) {
+                oldDesc = idxTypMap.put(idxName, desc);
 
-                idx.registerType(space, desc);
+                if (oldDesc != null)
+                    throw new IgniteException("Duplicate index name [idxName=" 
+ idxName +
+                        ", type1=" + desc.name() + ", type2=" + oldDesc.name() 
+ ']');
             }
         }
-        catch (IgniteCheckedException | RuntimeException e) {
-            unregisterCache0(space);
 
-            throw e;
+        // Apply pending operation which could have been completed as no-op at 
this point. There could be only one
+        // in-flight operation for a cache.
+        synchronized (stateMux) {
+            if (disconnected)
+                return;
+
+            for (SchemaOperation op : schemaOps.values()) {
+                if (F.eq(op.proposeMessage().deploymentId(), 
cctx.dynamicDeploymentId())) {
+                    if (op.started()) {
+                        SchemaOperationWorker worker = op.manager().worker();
+
+                        assert !worker.cacheRegistered();
+
+                        if (!worker.nop()) {
+                            IgniteInternalFuture fut = worker.future();
+
+                            assert fut.isDone();
+
+                            if (fut.error() == null) {
+                                SchemaAbstractOperation op0 = 
op.proposeMessage().operation();
+
+                                if (op0 instanceof SchemaIndexCreateOperation) 
{
+                                    SchemaIndexCreateOperation opCreate = 
(SchemaIndexCreateOperation)op0;
+
+                                    QueryTypeDescriptorImpl typeDesc = 
tblTypMap.get(opCreate.tableName());
+
+                                    assert typeDesc != null;
+
+                                    
QueryUtils.processDynamicIndexChange(opCreate.indexName(), opCreate.index(),
+                                        typeDesc);
+                                }
+                                else if (op0 instanceof 
SchemaIndexDropOperation) {
+                                    SchemaIndexDropOperation opDrop = 
(SchemaIndexDropOperation)op0;
+
+                                    QueryTypeDescriptorImpl typeDesc = 
idxTypMap.get(opDrop.indexName());
+
+                                    assert typeDesc != null;
+
+                                    
QueryUtils.processDynamicIndexChange(opDrop.indexName(), null, typeDesc);
+                                }
+                                else
+                                    assert false;
+                            }
+                        }
+                    }
+
+                    break;
+                }
+            }
         }
 
+        // Ready to register at this point.
+        registerCache0(space, cctx, cands);
+
         // Warn about possible implicit deserialization.
         if (!mustDeserializeClss.isEmpty()) {
             U.warn(log, "Some classes in query configuration cannot be written 
in binary format " +
@@ -209,46 +763,41 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        super.onKernalStop(cancel);
-
-        if (cancel && idx != null)
-            try {
-                while (!busyLock.tryBlock(500))
-                    idx.cancelAllQueries();
-
-                return;
-            }
-            catch (InterruptedException ignored) {
-                U.warn(log, "Interrupted while waiting for active queries 
cancellation.");
+    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws 
IgniteCheckedException {
+        Collection<SchemaOperationClientFuture> futs;
 
-                Thread.currentThread().interrupt();
-            }
+        synchronized (stateMux) {
+            disconnected = true;
+            exchangeReady = false;
 
-        busyLock.block();
-    }
+            // Clear client futures.
+            futs = new ArrayList<>(schemaCliFuts.values());
 
-    /** {@inheritDoc} */
-    @Override public void stop(boolean cancel) throws IgniteCheckedException {
-        super.stop(cancel);
+            schemaCliFuts.clear();
 
-        if (idx != null)
-            idx.stop();
+            // Clear operations data.
+            activeProposals.clear();
+            schemaOps.clear();
+        }
 
-        U.closeQuiet(qryDetailMetricsEvictTask);
-    }
+        // Complete client futures outside of synchonized block because they 
may have listeners/chains.
+        for (SchemaOperationClientFuture fut : futs)
+            fut.onDone(new SchemaOperationException("Client node is 
disconnected (operation result is unknown)."));
 
-    /** {@inheritDoc} */
-    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws 
IgniteCheckedException {
         if (idx != null)
             idx.onDisconnected(reconnectFut);
     }
 
     /**
+     * Handle cache start. Invoked either from 
GridCacheProcessor.onKernalStart() method or from exchange worker.
+     * When called for the first time, we initialize topology thus 
understanding whether current node is coordinator
+     * or not.
+     *
      * @param cctx Cache context.
+     * @param schema Index states.
      * @throws IgniteCheckedException If failed.
      */
-    public void onCacheStart(GridCacheContext cctx) throws 
IgniteCheckedException {
+    public void onCacheStart(GridCacheContext cctx, QuerySchema schema) throws 
IgniteCheckedException {
         if (idx == null)
             return;
 
@@ -258,45 +807,506 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
         cctx.shared().database().checkpointReadLock();
 
         try {
-            initializeCache(cctx);
+            initializeCache(cctx, schema);
         }
         finally {
             cctx.shared().database().checkpointReadUnlock();
 
-            busyLock.leaveBusy();
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @param cctx Cache context.
+     */
+    public void onCacheStop(GridCacheContext cctx) {
+        if (idx == null)
+            return;
+
+        if (!busyLock.enterBusy())
+            return;
+
+        try {
+            unregisterCache0(cctx.name());
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @return Skip field lookup flag.
+     */
+    public boolean skipFieldLookup() {
+        return skipFieldLookup;
+    }
+
+    /**
+     * @param skipFieldLookup Skip field lookup flag.
+     */
+    public void skipFieldLookup(boolean skipFieldLookup) {
+        this.skipFieldLookup = skipFieldLookup;
+    }
+
+    /**
+     * Handle custom discovery message.
+     *
+     * @param msg Message.
+     */
+    public void onDiscovery(SchemaAbstractDiscoveryMessage msg) {
+        IgniteUuid id = msg.id();
+
+        if (!dscoMsgIdHist.add(id)) {
+            U.warn(log, "Received duplicate schema custom discovery message 
(will ignore) [opId=" +
+                msg.operation().id() + ", msg=" + msg  +']');
+
+            return;
+        }
+
+        if (msg instanceof SchemaProposeDiscoveryMessage) {
+            SchemaProposeDiscoveryMessage msg0 = 
(SchemaProposeDiscoveryMessage)msg;
+
+            boolean exchange = onSchemaProposeDiscovery(msg0);
+
+            msg0.exchange(exchange);
+        }
+        else if (msg instanceof SchemaFinishDiscoveryMessage) {
+            SchemaFinishDiscoveryMessage msg0 = 
(SchemaFinishDiscoveryMessage)msg;
+
+            onSchemaFinishDiscovery(msg0);
+        }
+        else
+            U.warn(log, "Received unsupported schema custom discovery message 
(will ignore) [opId=" +
+                msg.operation().id() + ", msg=" + msg  +']');
+    }
+
+    /**
+     * Prepare change on started cache.
+     *
+     * @param op Operation.
+     * @return Result: affected type, nop flag, error.
+     */
+    private T3<QueryTypeDescriptorImpl, Boolean, SchemaOperationException> 
prepareChangeOnStartedCache(
+        SchemaAbstractOperation op) {
+        QueryTypeDescriptorImpl type = null;
+        boolean nop = false;
+        SchemaOperationException err = null;
+
+        String space = op.space();
+
+        if (op instanceof SchemaIndexCreateOperation) {
+            SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation) op;
+
+            QueryIndex idx = op0.index();
+
+            // Make sure table exists.
+            String tblName = op0.tableName();
+
+            type = type(space, tblName);
+
+            if (type == null)
+                err = new 
SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, 
tblName);
+            else {
+                // Make sure that index can be applied to the given table.
+                for (String idxField : idx.getFieldNames()) {
+                    if (!type.fields().containsKey(idxField)) {
+                        err = new 
SchemaOperationException(SchemaOperationException.CODE_COLUMN_NOT_FOUND,
+                            idxField);
+
+                        break;
+                    }
+                }
+            }
+
+            // Check conflict with other indexes.
+            if (err == null) {
+                String idxName = op0.index().getName();
+
+                QueryIndexKey idxKey = new QueryIndexKey(space, idxName);
+
+                if (idxs.get(idxKey) != null) {
+                    if (op0.ifNotExists())
+                        nop = true;
+                    else
+                        err = new 
SchemaOperationException(SchemaOperationException.CODE_INDEX_EXISTS, idxName);
+                }
+            }
+        }
+        else if (op instanceof SchemaIndexDropOperation) {
+            SchemaIndexDropOperation op0 = (SchemaIndexDropOperation) op;
+
+            String idxName = op0.indexName();
+
+            QueryIndexDescriptorImpl oldIdx = idxs.get(new 
QueryIndexKey(space, idxName));
+
+            if (oldIdx == null) {
+                if (op0.ifExists())
+                    nop = true;
+                else
+                    err = new 
SchemaOperationException(SchemaOperationException.CODE_INDEX_NOT_FOUND, 
idxName);
+            }
+            else
+                type = oldIdx.typeDescriptor();
+        }
+        else
+            err = new SchemaOperationException("Unsupported operation: " + op);
+
+        return new T3<>(type, nop, err);
+    }
+
+    /**
+     * Prepare operation on non-started cache.
+     *
+     * @param op Operation.
+     * @param schema Known cache schema.
+     * @return Result: nop flag, error.
+     */
+    private T2<Boolean, SchemaOperationException> 
prepareChangeOnNotStartedCache(SchemaAbstractOperation op,
+        QuerySchema schema) {
+        boolean nop = false;
+        SchemaOperationException err = null;
+
+        // Build table and index maps.
+        Map<String, QueryEntity> tblMap = new HashMap<>();
+        Map<String, T2<QueryEntity, QueryIndex>> idxMap = new HashMap<>();
+
+        for (QueryEntity entity : schema.entities()) {
+            String tblName = QueryUtils.tableName(entity);
+
+            QueryEntity oldEntity = tblMap.put(tblName, entity);
+
+            if (oldEntity != null) {
+                err = new SchemaOperationException("Invalid schema state 
(duplicate table found): " + tblName);
+
+                break;
+            }
+
+            for (QueryIndex entityIdx : entity.getIndexes()) {
+                String idxName = QueryUtils.indexName(entity, entityIdx);
+
+                T2<QueryEntity, QueryIndex> oldIdxEntity = idxMap.put(idxName, 
new T2<>(entity, entityIdx));
+
+                if (oldIdxEntity != null) {
+                    err = new SchemaOperationException("Invalid schema state 
(duplicate index found): " +
+                        idxName);
+
+                    break;
+                }
+            }
+
+            if (err != null)
+                break;
+        }
+
+        // Now check whether operation can be applied to schema.
+        if (op instanceof SchemaIndexCreateOperation) {
+            SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op;
+
+            String idxName = op0.indexName();
+
+            T2<QueryEntity, QueryIndex> oldIdxEntity = idxMap.get(idxName);
+
+            if (oldIdxEntity == null) {
+                String tblName = op0.tableName();
+
+                QueryEntity oldEntity = tblMap.get(tblName);
+
+                if (oldEntity == null)
+                    err = new 
SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, 
tblName);
+                else {
+                    for (String fieldName : op0.index().getFields().keySet()) {
+                        Set<String> oldEntityFields = new 
HashSet<>(oldEntity.getFields().keySet());
+
+                        for (Map.Entry<String, String> alias : 
oldEntity.getAliases().entrySet()) {
+                            oldEntityFields.remove(alias.getKey());
+                            oldEntityFields.add(alias.getValue());
+                        }
+
+                        if (!oldEntityFields.contains(fieldName)) {
+                            err = new 
SchemaOperationException(SchemaOperationException.CODE_COLUMN_NOT_FOUND,
+                                fieldName);
+
+                            break;
+                        }
+                    }
+                }
+            }
+            else {
+                if (op0.ifNotExists())
+                    nop = true;
+                else
+                    err = new 
SchemaOperationException(SchemaOperationException.CODE_INDEX_EXISTS, idxName);
+            }
+        }
+        else if (op instanceof SchemaIndexDropOperation) {
+            SchemaIndexDropOperation op0 = (SchemaIndexDropOperation)op;
+
+            String idxName = op0.indexName();
+
+            T2<QueryEntity, QueryIndex> oldIdxEntity = idxMap.get(idxName);
+
+            if (oldIdxEntity == null) {
+                if (op0.ifExists())
+                    nop = true;
+                else
+                    err = new 
SchemaOperationException(SchemaOperationException.CODE_INDEX_NOT_FOUND, 
idxName);
+            }
+        }
+        else
+            err = new SchemaOperationException("Unsupported operation: " + op);
+
+        return new T2<>(nop, err);
+    }
+
+    /**
+     * Invoked when coordinator finished ensuring that all participants are 
ready.
+     *
+     * @param op Operation.
+     * @param err Error (if any).
+     */
+    public void onCoordinatorFinished(SchemaAbstractOperation op, @Nullable 
SchemaOperationException err) {
+        synchronized (stateMux) {
+            SchemaFinishDiscoveryMessage msg = new 
SchemaFinishDiscoveryMessage(op, err);
+
+            try {
+                ctx.discovery().sendCustomEvent(msg);
+            }
+            catch (Exception e) {
+                // Failed to send finish message over discovery. This is 
something unrecoverable.
+                U.warn(log, "Failed to send schema finish discovery message 
[opId=" + op.id() + ']', e);
+            }
+        }
+    }
+
+    /**
+     * Get current coordinator node.
+     *
+     * @return Coordinator node.
+     */
+    private ClusterNode coordinator() {
+        assert !ctx.clientNode();
+
+        synchronized (stateMux) {
+            if (crd == null) {
+                ClusterNode crd0 = null;
+
+                for (ClusterNode node : ctx.discovery().aliveServerNodes()) {
+                    if (crd0 == null || crd0.order() > node.order())
+                        crd0 = node;
+                }
+
+                assert crd0 != null;
+
+                crd = crd0;
+            }
+
+            return crd;
+        }
+    }
+
+    /**
+     * Get rid of stale IO message received from other nodes which joined when 
operation had been in progress.
+     *
+     * @param opId Operation ID.
+     */
+    private void cleanStaleStatusMessages(UUID opId) {
+        Iterator<SchemaOperationStatusMessage> it = pendingMsgs.iterator();
+
+        while (it.hasNext()) {
+            SchemaOperationStatusMessage statusMsg = it.next();
+
+            if (F.eq(opId, statusMsg.operationId())) {
+                it.remove();
+
+                if (log.isDebugEnabled())
+                    log.debug("Dropped operation status message because it is 
already completed [opId=" + opId +
+                        ", rmtNode=" + statusMsg.senderNodeId() + ']');
+            }
+        }
+    }
+
+    /**
+     * Apply positive index operation result.
+     *
+     * @param op Operation.
+     * @param type Type descriptor (if available),
+     */
+    public void onLocalOperationFinished(SchemaAbstractOperation op, @Nullable 
QueryTypeDescriptorImpl type) {
+        synchronized (stateMux) {
+            if (disconnected)
+                return;
+
+            // No need to apply anything to obsolete type.
+            if (type == null || type.obsolete()) {
+                if (log.isDebugEnabled())
+                    log.debug("Local operation finished, but type descriptor 
is either missing or obsolete " +
+                        "(will ignore) [opId=" + op.id() + ']');
+
+                return;
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Local operation finished successfully [opId=" + 
op.id() + ']');
+
+            try {
+                if (op instanceof SchemaIndexCreateOperation) {
+                    SchemaIndexCreateOperation op0 = 
(SchemaIndexCreateOperation)op;
+
+                    QueryUtils.processDynamicIndexChange(op0.indexName(), 
op0.index(), type);
+
+                    QueryIndexDescriptorImpl idxDesc = 
type.index(op0.indexName());
+
+                    QueryIndexKey idxKey = new QueryIndexKey(op.space(), 
op0.indexName());
+
+                    idxs.put(idxKey, idxDesc);
+                }
+                else {
+                    assert op instanceof SchemaIndexDropOperation;
+
+                    SchemaIndexDropOperation op0 = (SchemaIndexDropOperation) 
op;
+
+                    QueryUtils.processDynamicIndexChange(op0.indexName(), 
null, type);
+
+                    QueryIndexKey idxKey = new QueryIndexKey(op.space(), 
op0.indexName());
+
+                    idxs.remove(idxKey);
+                }
+            }
+            catch (IgniteCheckedException e) {
+                U.warn(log, "Failed to finish index operation [opId=" + 
op.id() + " op=" + op + ']', e);
+            }
+        }
+    }
+
+    /**
+     * Handle node leave.
+     *
+     * @param node Node.
+     */
+    public void onNodeLeave(ClusterNode node) {
+        synchronized (stateMux) {
+            // Clients do not send status messages and are never coordinators.
+            if (ctx.clientNode())
+                return;
+
+            ClusterNode crd0 = coordinator();
+
+            if (F.eq(node.id(), crd0.id())) {
+                crd = null;
+
+                crd0 = coordinator();
+            }
+
+            for (SchemaOperation op : schemaOps.values()) {
+                if (op.started()) {
+                    op.manager().onNodeLeave(node.id(), crd0);
+
+                    if (crd0.isLocal())
+                        unwindPendingMessages(op.id(), op.manager());
+                }
+            }
+        }
+    }
+
+    /**
+     * Process index operation.
+     *
+     * @param op Operation.
+     * @param type Type descriptor.
+     * @param depId Cache deployment ID.
+     * @param cancelTok Cancel token.
+     * @throws SchemaOperationException If failed.
+     */
+    public void processIndexOperationLocal(SchemaAbstractOperation op, 
QueryTypeDescriptorImpl type, IgniteUuid depId,
+        SchemaIndexOperationCancellationToken cancelTok) throws 
SchemaOperationException {
+        if (log.isDebugEnabled())
+            log.debug("Started local index operation [opId=" + op.id() + ']');
+
+        String space = op.space();
+
+        GridCacheAdapter cache = ctx.cache().internalCache(op.space());
+
+        if (cache == null || !F.eq(depId, 
cache.context().dynamicDeploymentId()))
+            throw new 
SchemaOperationException(SchemaOperationException.CODE_CACHE_NOT_FOUND, 
op.space());
+
+        try {
+            if (op instanceof SchemaIndexCreateOperation) {
+                SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation) 
op;
+
+                QueryIndexDescriptorImpl idxDesc = 
QueryUtils.createIndexDescriptor(type, op0.index());
+
+                SchemaIndexCacheVisitor visitor =
+                    new SchemaIndexCacheVisitorImpl(this, cache.context(), 
space, op0.tableName(), cancelTok);
+
+                idx.dynamicIndexCreate(space, op0.tableName(), idxDesc, 
op0.ifNotExists(), visitor);
+            }
+            else if (op instanceof SchemaIndexDropOperation) {
+                SchemaIndexDropOperation op0 = (SchemaIndexDropOperation) op;
+
+                idx.dynamicIndexDrop(space, op0.indexName(), op0.ifExists());
+            }
+            else
+                throw new SchemaOperationException("Unsupported operation: " + 
op);
+        }
+        catch (Exception e) {
+            if (e instanceof SchemaOperationException)
+                throw (SchemaOperationException)e;
+            else
+                throw new SchemaOperationException("Schema change operation 
failed: " + e.getMessage(), e);
         }
     }
 
     /**
+     * Register cache in indexing SPI.
+     *
+     * @param space Space.
      * @param cctx Cache context.
+     * @param cands Candidates.
+     * @throws IgniteCheckedException If failed.
      */
-    public void onCacheStop(GridCacheContext cctx) {
-        if (idx == null)
-            return;
+    private void registerCache0(String space, GridCacheContext<?, ?> cctx, 
Collection<QueryTypeCandidate> cands)
+        throws IgniteCheckedException {
+        synchronized (stateMux) {
+            idx.registerCache(space, cctx, cctx.config());
 
-        if (!busyLock.enterBusy())
-            return;
+            try {
+                for (QueryTypeCandidate cand : cands) {
+                    QueryTypeIdKey typeId = cand.typeId();
+                    QueryTypeIdKey altTypeId = cand.alternativeTypeId();
+                    QueryTypeDescriptorImpl desc = cand.descriptor();
 
-        try {
-            unregisterCache0(cctx.name());
-        }
-        finally {
-            busyLock.leaveBusy();
-        }
-    }
+                    if (typesByName.putIfAbsent(new QueryTypeNameKey(space, 
desc.name()), desc) != null)
+                        throw new IgniteCheckedException("Type with name '" + 
desc.name() + "' already indexed " +
+                            "in cache '" + space + "'.");
 
-    /**
-     * @return Skip field lookup flag.
-     */
-    public boolean skipFieldLookup() {
-        return skipFieldLookup;
-    }
+                    types.put(typeId, desc);
 
-    /**
-     * @param skipFieldLookup Skip field lookup flag.
-     */
-    public void skipFieldLookup(boolean skipFieldLookup) {
-        this.skipFieldLookup = skipFieldLookup;
+                    if (altTypeId != null)
+                        types.put(altTypeId, desc);
+
+                    for (QueryIndexDescriptorImpl idx : desc.indexes0()) {
+                        QueryIndexKey idxKey = new QueryIndexKey(space, 
idx.name());
+
+                        QueryIndexDescriptorImpl oldIdx = 
idxs.putIfAbsent(idxKey, idx);
+
+                        if (oldIdx != null) {
+                            throw new IgniteException("Duplicate index name 
[space=" + space +
+                                ", idxName=" + idx.name() + ", existingTable=" 
+ oldIdx.typeDescriptor().tableName() +
+                                ", table=" + desc.tableName() + ']');
+                        }
+                    }
+
+                    idx.registerType(space, desc);
+                }
+
+                spaces.add(CU.mask(space));
+            }
+            catch (IgniteCheckedException | RuntimeException e) {
+                unregisterCache0(space);
+
+                throw e;
+            }
+        }
     }
 
     /**
@@ -307,13 +1317,8 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     private void unregisterCache0(String space) {
         assert idx != null;
 
-        try {
-            idx.unregisterCache(space);
-        }
-        catch (Exception e) {
-            U.error(log, "Failed to clear indexing on cache unregister (will 
ignore): " + space, e);
-        }
-        finally {
+        synchronized (stateMux) {
+            // Clear types.
             Iterator<Map.Entry<QueryTypeIdKey, QueryTypeDescriptorImpl>> it = 
types.entrySet().iterator();
 
             while (it.hasNext()) {
@@ -323,9 +1328,79 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
                     it.remove();
 
                     typesByName.remove(new QueryTypeNameKey(space, 
entry.getValue().name()));
+
+                    entry.getValue().markObsolete();
                 }
             }
+
+            // Clear indexes.
+            Iterator<Map.Entry<QueryIndexKey, QueryIndexDescriptorImpl>> idxIt 
= idxs.entrySet().iterator();
+
+            while (idxIt.hasNext()) {
+                Map.Entry<QueryIndexKey, QueryIndexDescriptorImpl> idxEntry = 
idxIt.next();
+
+                QueryIndexKey idxKey = idxEntry.getKey();
+
+                if (F.eq(space, idxKey.space()))
+                    idxIt.remove();
+            }
+
+            // Notify in-progress index operations.
+            for (SchemaOperation op : schemaOps.values()) {
+                if (op.started())
+                    op.manager().worker().cancel();
+            }
+
+            // Notify indexing.
+            try {
+                idx.unregisterCache(space);
+            }
+            catch (Exception e) {
+                U.error(log, "Failed to clear indexing on cache unregister 
(will ignore): " + space, e);
+            }
+
+            spaces.remove(CU.mask(space));
+        }
+    }
+
+    /**
+     * Check whether provided key and value belongs to expected space and 
table.
+     *
+     * @param cctx Target cache context.
+     * @param expSpace Expected space.
+     * @param expTblName Expected table name.
+     * @param key Key.
+     * @param val Value.
+     * @return {@code True} if this key-value pair belongs to expected 
space/table, {@code false} otherwise or
+     *     if space or table doesn't exist.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("ConstantConditions")
+    public boolean belongsToTable(GridCacheContext cctx, String expSpace, 
String expTblName, KeyCacheObject key,
+        CacheObject val) throws IgniteCheckedException {
+        QueryTypeDescriptorImpl desc = type(expSpace, val);
+
+        if (desc == null)
+            return false;
+
+        if (!F.eq(expTblName, desc.tableName()))
+            return false;
+
+        if (!cctx.cacheObjects().isBinaryObject(val)) {
+            Class<?> valCls = val.value(cctx.cacheObjectContext(), 
false).getClass();
+
+            if (!desc.valueClass().isAssignableFrom(valCls))
+                return false;
         }
+
+        if (!cctx.cacheObjects().isBinaryObject(key)) {
+            Class<?> keyCls = key.value(cctx.cacheObjectContext(), 
false).getClass();
+
+            if (!desc.keyClass().isAssignableFrom(keyCls))
+                return false;
+        }
+
+        return true;
     }
 
     /**
@@ -457,7 +1532,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
             if (desc == null)
                 return;
 
-            idx.store(space, desc, key, partId, val, ver, expirationTime, 
link);
+            idx.store(space, desc.name(), key, partId, val, ver, 
expirationTime, link);
         }
         finally {
             busyLock.leaveBusy();
@@ -518,6 +1593,30 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * Gets type descriptor for space by given object's type.
+     *
+     * @param space Space name.
+     * @param val Object to determine type for.
+     * @return Type descriptor.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private QueryTypeDescriptorImpl type(@Nullable String space, CacheObject 
val) throws IgniteCheckedException {
+        CacheObjectContext coctx = cacheObjectContext(space);
+
+        QueryTypeIdKey id;
+
+        boolean binaryVal = ctx.cacheObjects().isBinaryObject(val);
+
+        if (binaryVal)
+            id = new QueryTypeIdKey(space, ctx.cacheObjects().typeId(val));
+        else
+            id = new QueryTypeIdKey(space, val.value(coctx, false).getClass());
+
+        return types.get(id);
+    }
+
+    /**
      * @throws IgniteCheckedException If failed.
      */
     private void checkEnabled() throws IgniteCheckedException {
@@ -637,9 +1736,9 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
                     @Override public QueryCursor<Cache.Entry<K, V>> applyx() 
throws IgniteCheckedException {
                         String type = qry.getType();
 
-                        QueryTypeDescriptorImpl typeDesc = type(cctx.name(), 
type);
+                        String typeName = typeName(cctx.name(), type);
 
-                        qry.setType(typeDesc.name());
+                        qry.setType(typeName);
 
                         sendQueryExecutedEvent(
                             qry.getSql(),
@@ -682,6 +1781,80 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * Entry point for index procedure.
+     *
+     * @param space Space name.
+     * @param tblName Table name.
+     * @param idx Index.
+     * @param ifNotExists When set to {@code true} operation will fail if 
index already exists.
+     * @return Future completed when index is created.
+     */
+    public IgniteInternalFuture<?> dynamicIndexCreate(String space, String 
tblName, QueryIndex idx,
+        boolean ifNotExists) {
+        SchemaAbstractOperation op = new 
SchemaIndexCreateOperation(UUID.randomUUID(), space, tblName, idx, ifNotExists);
+
+        return startIndexOperationDistributed(op);
+    }
+
+    /**
+     * Entry point for index drop procedure
+     *
+     * @param idxName Index name.
+     * @param ifExists When set to {@code true} operation fill fail if index 
doesn't exists.
+     * @return Future completed when index is created.
+     */
+    public IgniteInternalFuture<?> dynamicIndexDrop(String space, String 
idxName, boolean ifExists) {
+        SchemaAbstractOperation op = new 
SchemaIndexDropOperation(UUID.randomUUID(), space, idxName, ifExists);
+
+        return startIndexOperationDistributed(op);
+    }
+
+    /**
+     * Start distributed index change operation.
+     *
+     * @param op Operation.
+     * @return Future.
+     */
+    private IgniteInternalFuture<?> 
startIndexOperationDistributed(SchemaAbstractOperation op) {
+        SchemaOperationClientFuture fut = new 
SchemaOperationClientFuture(op.id());
+
+        SchemaOperationClientFuture oldFut = schemaCliFuts.put(op.id(), fut);
+
+        assert oldFut == null;
+
+        try {
+            ctx.discovery().sendCustomEvent(new 
SchemaProposeDiscoveryMessage(op));
+
+            if (log.isDebugEnabled())
+                log.debug("Sent schema propose discovery message [opId=" + 
op.id() + ", op=" + op + ']');
+
+            boolean disconnected0;
+
+            synchronized (stateMux) {
+                disconnected0 = disconnected;
+            }
+
+            if (disconnected0) {
+                fut.onDone(new SchemaOperationException("Client node is 
disconnected (operation result is unknown)."));
+
+                schemaCliFuts.remove(op.id());
+            }
+        }
+        catch (Exception e) {
+            if (e instanceof SchemaOperationException)
+                fut.onDone(e);
+            else {
+                fut.onDone(new SchemaOperationException("Failed to start 
schema change operation due to " +
+                    "unexpected exception [opId=" + op.id() + ", op=" + op + 
']', e));
+            }
+
+            schemaCliFuts.remove(op.id());
+        }
+
+        return fut;
+    }
+
+    /**
      * @param sqlQry Sql query.
      * @param params Params.
      */
@@ -704,14 +1877,15 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * @param schema Schema.
+     *
+     * @param space Space name.
      * @param sql Query.
      * @return {@link PreparedStatement} from underlying engine to supply 
metadata to Prepared - most likely H2.
      */
-    public PreparedStatement prepareNativeStatement(String schema, String sql) 
throws SQLException {
+    public PreparedStatement prepareNativeStatement(String space, String sql) 
throws SQLException {
         checkxEnabled();
 
-        return idx.prepareNativeStatement(schema, sql);
+        return idx.prepareNativeStatement(space, sql);
     }
 
     /**
@@ -838,9 +2012,9 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
             return executeQuery(GridCacheQueryType.TEXT, clause, cctx,
                 new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, 
V>>>() {
                     @Override public GridCloseableIterator<IgniteBiTuple<K, 
V>> applyx() throws IgniteCheckedException {
-                        QueryTypeDescriptorImpl type = type(space, resType);
+                        String typeName = typeName(space, resType);
 
-                        return idx.queryLocalText(space, clause, type, 
filters);
+                        return idx.queryLocalText(space, clause, typeName, 
filters);
                     }
                 }, true);
         }
@@ -924,20 +2098,35 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * Gets type descriptor for space and type name.
+     * Get type descriptor for the given space and table name.
+     * @param space Space.
+     * @param tblName Table name.
+     * @return Type (if any).
+     */
+    @Nullable private QueryTypeDescriptorImpl type(@Nullable String space, 
String tblName) {
+        for (QueryTypeDescriptorImpl type : types.values()) {
+            if (F.eq(space, type.space()) && F.eq(tblName, type.tableName()))
+                return type;
+        }
+
+        return null;
+    }
+
+    /**
+     * Gets type name for provided space and type name if type is still valid.
      *
      * @param space Space name.
      * @param typeName Type name.
      * @return Type descriptor.
      * @throws IgniteCheckedException If failed.
      */
-    public QueryTypeDescriptorImpl type(@Nullable String space, String 
typeName) throws IgniteCheckedException {
+    private String typeName(@Nullable String space, String typeName) throws 
IgniteCheckedException {
         QueryTypeDescriptorImpl type = typesByName.get(new 
QueryTypeNameKey(space, typeName));
 
         if (type == null)
             throw new IgniteCheckedException("Failed to find SQL table for 
type: " + typeName);
 
-        return type;
+        return type.name();
     }
 
     /**
@@ -972,7 +2161,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
 
             throw (IgniteCheckedException)err;
         }
-        catch (CacheException e) {
+        catch (CacheException | IgniteException e) {
             err = e;
 
             throw e;
@@ -998,6 +2187,146 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * Send status message to coordinator node.
+     *
+     * @param destNodeId Destination node ID.
+     * @param opId Operation ID.
+     * @param err Error.
+     */
+    public void sendStatusMessage(UUID destNodeId, UUID opId, 
SchemaOperationException err) {
+        if (log.isDebugEnabled())
+            log.debug("Sending schema operation status message [opId=" + opId 
+ ", crdNode=" + destNodeId +
+                ", err=" + err + ']');
+
+        try {
+            byte[] errBytes = marshalSchemaError(opId, err);
+
+            SchemaOperationStatusMessage msg = new 
SchemaOperationStatusMessage(opId, errBytes);
+
+            // Messages must go to dedicated schema pool. We cannot push them 
to query pool because in this case
+            // they could be blocked with other query requests.
+            ctx.io().sendToGridTopic(destNodeId, TOPIC_SCHEMA, msg, 
SCHEMA_POOL);
+        }
+        catch (IgniteCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send schema status response [opId=" + 
opId + ", destNodeId=" + destNodeId +
+                    ", err=" + e + ']');
+        }
+    }
+
+    /**
+     * Process status message.
+     *
+     * @param msg Status message.
+     */
+    private void processStatusMessage(SchemaOperationStatusMessage msg) {
+        synchronized (stateMux) {
+            if (completedOpIds.contains(msg.operationId())) {
+                // Received message from a node which joined topology in the 
middle of operation execution.
+                if (log.isDebugEnabled())
+                    log.debug("Received status message for completed operation 
(will ignore) [" +
+                        "opId=" + msg.operationId() + ", sndNodeId=" + 
msg.senderNodeId() + ']');
+
+                return;
+            }
+
+            UUID opId = msg.operationId();
+
+            SchemaProposeDiscoveryMessage proposeMsg = 
activeProposals.get(opId);
+
+            if (proposeMsg != null) {
+                SchemaOperation op = schemaOps.get(proposeMsg.schemaKey());
+
+                if (op != null && F.eq(op.id(), opId) && op.started() && 
coordinator().isLocal()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Received status message [opId=" + 
msg.operationId() +
+                            ", sndNodeId=" + msg.senderNodeId() + ']');
+
+                    op.manager().onNodeFinished(msg.senderNodeId(), 
unmarshalSchemaError(msg.errorBytes()));
+
+                    return;
+                }
+            }
+
+            // Put to pending set if operation is not visible/ready yet.
+            pendingMsgs.add(msg);
+
+            if (log.isDebugEnabled())
+                log.debug("Received status message (added to pending set) 
[opId=" + msg.operationId() +
+                    ", sndNodeId=" + msg.senderNodeId() + ']');
+        }
+    }
+
+    /**
+     * Unwind pending messages for particular operation.
+     *
+     * @param opId Operation ID.
+     * @param mgr Manager.
+     */
+    private void unwindPendingMessages(UUID opId, SchemaOperationManager mgr) {
+        assert Thread.holdsLock(stateMux);
+
+        Iterator<SchemaOperationStatusMessage> it = pendingMsgs.iterator();
+
+        while (it.hasNext()) {
+            SchemaOperationStatusMessage msg = it.next();
+
+            if (F.eq(msg.operationId(), opId)) {
+                mgr.onNodeFinished(msg.senderNodeId(), 
unmarshalSchemaError(msg.errorBytes()));
+
+                it.remove();
+            }
+        }
+    }
+
+    /**
+     * Marshal schema error.
+     *
+     * @param err Error.
+     * @return Error bytes.
+     */
+    @Nullable private byte[] marshalSchemaError(UUID opId, @Nullable 
SchemaOperationException err) {
+        if (err == null)
+            return null;
+
+        try {
+            return U.marshal(marsh, err);
+        }
+        catch (Exception e) {
+            U.warn(log, "Failed to marshal schema operation error [opId=" + 
opId + ", err=" + err + ']', e);
+
+            try {
+                return U.marshal(marsh, new 
SchemaOperationException("Operation failed, but error cannot be " +
+                    "serialized (see local node log for more details) [opId=" 
+ opId + ", nodeId=" +
+                    ctx.localNodeId() + ']'));
+            }
+            catch (Exception e0) {
+                assert false; // Impossible situation.
+
+                return null;
+            }
+        }
+    }
+
+    /**
+     * Unmarshal schema error.
+     *
+     * @param errBytes Error bytes.
+     * @return Error.
+     */
+    @Nullable private SchemaOperationException unmarshalSchemaError(@Nullable 
byte[] errBytes) {
+        if (errBytes == null)
+            return null;
+
+        try {
+            return U.unmarshal(marsh, errBytes, 
U.resolveClassLoader(ctx.config()));
+        }
+        catch (Exception e) {
+            return new SchemaOperationException("Operation failed, but error 
cannot be deserialized.");
+        }
+    }
+
+    /**
      * @param ver Version.
      */
     public static void 
setRequestAffinityTopologyVersion(AffinityTopologyVersion ver) {
@@ -1010,4 +2339,160 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     public static AffinityTopologyVersion getRequestAffinityTopologyVersion() {
         return requestTopVer.get();
     }
+
+    /**
+     * Schema operation.
+     */
+    private class SchemaOperation {
+        /** Original propose msg. */
+        private final SchemaProposeDiscoveryMessage proposeMsg;
+
+        /** Next schema operation. */
+        private SchemaOperation next;
+
+        /** Operation manager. */
+        private SchemaOperationManager mgr;
+
+        /** Finish message. */
+        private SchemaFinishDiscoveryMessage finishMsg;
+
+        /** Finish guard. */
+        private final AtomicBoolean finishGuard = new AtomicBoolean();
+
+        /**
+         * Constructor.
+         *
+         * @param proposeMsg Original propose message.
+         */
+        public SchemaOperation(SchemaProposeDiscoveryMessage proposeMsg) {
+            this.proposeMsg = proposeMsg;
+        }
+
+        /**
+         * @return Operation ID.
+         */
+        public UUID id() {
+            return proposeMsg.operation().id();
+        }
+
+        /**
+         * @return Original propose message.
+         */
+        public SchemaProposeDiscoveryMessage proposeMessage() {
+            return proposeMsg;
+        }
+
+        /**
+         * @return Next schema operation.
+         */
+        @Nullable public SchemaOperation next() {
+            return next;
+        }
+
+        /**
+         * @param next Next schema operation.
+         */
+        public void next(SchemaOperation next) {
+            this.next = next;
+        }
+
+        /**
+         * @param finishMsg Finish message.
+         */
+        public void finishMessage(SchemaFinishDiscoveryMessage finishMsg) {
+            this.finishMsg = finishMsg;
+        }
+
+        /**
+         * @return {@code True} if finish request already received.
+         */
+        public boolean hasFinishMessage() {
+            return finishMsg != null;
+        }
+
+        /**
+         * Handle finish message.
+         */
+        @SuppressWarnings("unchecked")
+        public void doFinish() {
+            assert started();
+
+            if (!finishGuard.compareAndSet(false, true))
+                return;
+
+            final UUID opId = id();
+            final SchemaKey key = proposeMsg.schemaKey();
+
+            // Operation might be still in progress on client nodes which are 
not tracked by coordinator,
+            // so we chain to operation future instead of doing synchronous 
unwind.
+            mgr.worker().future().listen(new 
IgniteInClosure<IgniteInternalFuture>() {
+                @Override public void apply(IgniteInternalFuture fut) {
+                    synchronized (stateMux) {
+                        SchemaOperation op = schemaOps.remove(key);
+
+                        assert op != null;
+                        assert F.eq(op.id(), opId);
+
+                        // Chain to the next operation (if any).
+                        final SchemaOperation nextOp = op.next();
+
+                        if (nextOp != null) {
+                            schemaOps.put(key, nextOp);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Next schema change operation 
started [opId=" + nextOp.id() + ']');
+
+                            assert !nextOp.started();
+
+                            // Cannot execute operation synchronously because 
it may cause starvation in exchange
+                            // thread under load. Hence, moving short-lived 
operation to separate worker.
+                            new IgniteThread(ctx.igniteInstanceName(), 
"schema-circuit-breaker-" + op.id(),
+                                new Runnable() {
+                                @Override public void run() {
+                                    onSchemaPropose(nextOp.proposeMessage());
+                                }
+                            }).start();
+                        }
+                    }
+                }
+            });
+        }
+
+        /**
+         * Unwind operation queue and get tail operation.
+         *
+         * @return Tail operation.
+         */
+        public SchemaOperation unwind() {
+            if (next == null)
+                return this;
+            else
+                return next.unwind();
+        }
+
+        /**
+         * Whether operation started.
+         *
+         * @return {@code True} if started.
+         */
+        public boolean started() {
+            return mgr != null;
+        }
+
+        /**
+         * @return Operation manager.
+         */
+        public SchemaOperationManager manager() {
+            return mgr;
+        }
+
+        /**
+         * @param mgr Operation manager.
+         */
+        public void manager(SchemaOperationManager mgr) {
+            assert this.mgr == null;
+
+            this.mgr = mgr;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
index 44c41c1..b7434d3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
@@ -81,6 +81,13 @@ public interface GridQueryTypeDescriptor {
     public Map<String, GridQueryIndexDescriptor> indexes();
 
     /**
+     * Get text index for this type (if any).
+     *
+     * @return Text index or {@code null}.
+     */
+    public GridQueryIndexDescriptor textIndex();
+
+    /**
      * Gets value class.
      *
      * @return Value class.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java
index b15007e..0666493 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java
@@ -81,6 +81,13 @@ public class IgniteSQLException extends IgniteException {
     }
 
     /**
+     * @return Ignite SQL error code.
+     */
+    public int statusCode() {
+        return statusCode;
+    }
+
+    /**
      * @return JDBC exception containing details from this instance.
      */
     public SQLException toJdbcException() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexDescriptorImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexDescriptorImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexDescriptorImpl.java
index 9d2d20c..1b85af5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexDescriptorImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexDescriptorImpl.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.processors.query;
 
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.QueryIndexType;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
@@ -45,22 +47,48 @@ public class QueryIndexDescriptorImpl implements 
GridQueryIndexDescriptor {
     /** Fields which should be indexed in descending order. */
     private Collection<String> descendings;
 
+    /** Type descriptor. */
+    @GridToStringExclude
+    private final QueryTypeDescriptorImpl typDesc;
+
+    /** Index name. */
+    private final String name;
+
     /** */
     private final QueryIndexType type;
 
     /** */
-    private int inlineSize;
+    private final int inlineSize;
 
     /**
+     * Constructor.
+     *
+     * @param typDesc Type descriptor.
+     * @param name Index name.
      * @param type Type.
+     * @param inlineSize Inline size.
      */
-    public QueryIndexDescriptorImpl(QueryIndexType type, int inlineSize) {
+    public QueryIndexDescriptorImpl(QueryTypeDescriptorImpl typDesc, String 
name, QueryIndexType type, int inlineSize) {
         assert type != null;
 
+        this.typDesc = typDesc;
+        this.name = name;
         this.type = type;
         this.inlineSize = inlineSize;
     }
 
+    /**
+     * @return Type descriptor.
+     */
+    public QueryTypeDescriptorImpl typeDescriptor() {
+        return typDesc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return name;
+    }
+
     /** {@inheritDoc} */
     @Override public Collection<String> fields() {
         Collection<String> res = new ArrayList<>(fields.size());
@@ -87,8 +115,14 @@ public class QueryIndexDescriptorImpl implements 
GridQueryIndexDescriptor {
      * @param field Field name.
      * @param orderNum Field order number in this index.
      * @param descending Sort order.
+     * @return This instance for chaining.
+     * @throws IgniteCheckedException If failed.
      */
-    public void addField(String field, int orderNum, boolean descending) {
+    public QueryIndexDescriptorImpl addField(String field, int orderNum, 
boolean descending)
+        throws IgniteCheckedException {
+        if (!typDesc.hasField(field))
+            throw new IgniteCheckedException("Field not found: " + field);
+
         fields.add(new T2<>(field, orderNum));
 
         if (descending) {
@@ -97,6 +131,8 @@ public class QueryIndexDescriptorImpl implements 
GridQueryIndexDescriptor {
 
             descendings.add(field);
         }
+
+        return this;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexKey.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexKey.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexKey.java
new file mode 100644
index 0000000..f580111
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexKey.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.io.Serializable;
+
+/**
+ * Index key.
+ */
+public class QueryIndexKey implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Space. */
+    private final String space;
+
+    /** Name. */
+    private final String name;
+
+    /**
+     * Constructor.
+     *
+     * @param space Space.
+     * @param name Name.
+     */
+    public QueryIndexKey(String space, String name) {
+        this.space = space;
+        this.name = name;
+    }
+
+    /**
+     * @return Space.
+     */
+    public String space() {
+        return space;
+    }
+
+    /**
+     * @return Name.
+     */
+    public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return 31 * (space != null ? space.hashCode() : 0) + (name != null ? 
name.hashCode() : 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        QueryIndexKey other = (QueryIndexKey)o;
+
+        return F.eq(name, other.name) && F.eq(space, other.space);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(QueryIndexKey.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java
new file mode 100644
index 0000000..395f077
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import 
org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
+import 
org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import 
org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexCreateOperation;
+import 
org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexDropOperation;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Dynamic cache schema.
+ */
+public class QuerySchema implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Query entities. */
+    private final Collection<QueryEntity> entities = new LinkedList<>();
+
+    /** Mutex for state synchronization. */
+    private final Object mux = new Object();
+
+    /**
+     * Default constructor.
+     */
+    public QuerySchema() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param entities Query entities.
+     */
+    public QuerySchema(Collection<QueryEntity> entities) {
+        assert entities != null;
+
+        for (QueryEntity qryEntity : entities)
+            this.entities.add(new QueryEntity(qryEntity));
+    }
+
+    /**
+     * Copy object.
+     *
+     * @return Copy.
+     */
+    public QuerySchema copy() {
+        synchronized (mux) {
+            QuerySchema res = new QuerySchema();
+
+            for (QueryEntity qryEntity : entities)
+                res.entities.add(new QueryEntity(qryEntity));
+
+            return res;
+        }
+    }
+
+    /**
+     * Process finish message.
+     *
+     * @param msg Message.
+     */
+    public void finish(SchemaFinishDiscoveryMessage msg) {
+        synchronized (mux) {
+            SchemaAbstractOperation op = msg.operation();
+
+            if (op instanceof SchemaIndexCreateOperation) {
+                SchemaIndexCreateOperation op0 = 
(SchemaIndexCreateOperation)op;
+
+                for (QueryEntity entity : entities) {
+                    String tblName = QueryUtils.tableName(entity);
+
+                    if (F.eq(tblName, op0.tableName())) {
+                        boolean exists = false;
+
+                        for (QueryIndex idx : entity.getIndexes()) {
+                            if (F.eq(idx.getName(), op0.indexName())) {
+                                exists = true;
+
+                                break;
+                            }
+                        }
+
+                        if (!exists) {
+                            List<QueryIndex> idxs = new 
ArrayList<>(entity.getIndexes());
+
+                            idxs.add(op0.index());
+
+                            entity.clearIndexes();
+                            entity.setIndexes(idxs);
+                        }
+
+                        break;
+                    }
+                }
+            }
+            else {
+                assert op instanceof SchemaIndexDropOperation;
+
+                SchemaIndexDropOperation op0 = (SchemaIndexDropOperation)op;
+
+                for (QueryEntity entity : entities) {
+                    Collection<QueryIndex> idxs = entity.getIndexes();
+
+                    QueryIndex victim = null;
+
+                    for (QueryIndex idx : idxs) {
+                        if (F.eq(idx.getName(), op0.indexName())) {
+                            victim = idx;
+
+                            break;
+                        }
+                    }
+
+                    if (victim != null) {
+                        List<QueryIndex> newIdxs = new 
ArrayList<>(entity.getIndexes());
+
+                        newIdxs.remove(victim);
+
+                        entity.clearIndexes();
+                        entity.setIndexes(idxs);
+
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @return Query entities.
+     */
+    public Collection<QueryEntity> entities() {
+        synchronized (mux) {
+            return new ArrayList<>(entities);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(QuerySchema.class, this);
+    }
+}

Reply via email to