http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 2b957be..ceb139a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -17,34 +17,29 @@ package org.apache.ignite.internal.processors.query; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; -import javax.cache.Cache; -import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.binary.Binarylizable; +import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.CacheQueryExecutedEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -54,7 +49,23 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.QueryCursorEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; +import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl; +import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken; +import org.apache.ignite.internal.processors.query.schema.SchemaKey; +import org.apache.ignite.internal.processors.query.schema.SchemaOperationClientFuture; +import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; +import org.apache.ignite.internal.processors.query.schema.SchemaOperationManager; +import org.apache.ignite.internal.processors.query.schema.SchemaOperationWorker; +import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage; +import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage; +import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage; +import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage; +import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; +import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexCreateOperation; +import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexDropOperation; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; +import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -63,17 +74,45 @@ import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.IgniteOutClosureX; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.internal.util.worker.GridWorkerFuture; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.indexing.IndexingQueryFilter; +import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; + +import javax.cache.Cache; +import javax.cache.CacheException; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; +import static org.apache.ignite.internal.GridTopic.TOPIC_SCHEMA; import static org.apache.ignite.internal.IgniteComponentType.INDEXING; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SCHEMA_POOL; /** * Indexing processor. @@ -82,26 +121,70 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** Queries detail metrics eviction frequency. */ private static final int QRY_DETAIL_METRICS_EVICTION_FREQ = 3_000; + /** */ + private static final ThreadLocal<AffinityTopologyVersion> requestTopVer = new ThreadLocal<>(); + /** For tests. */ public static Class<? extends GridQueryIndexing> idxCls; + /** JDK marshaller to serialize errors. */ + private final JdkMarshaller marsh = new JdkMarshaller(); + /** */ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + /** */ + private GridTimeoutProcessor.CancelableTask qryDetailMetricsEvictTask; + /** Type descriptors. */ - private final Map<QueryTypeIdKey, QueryTypeDescriptorImpl> types = new ConcurrentHashMap8<>(); + private final Map<QueryTypeIdKey, QueryTypeDescriptorImpl> types = new ConcurrentHashMap<>(); /** Type descriptors. */ - private final ConcurrentMap<QueryTypeNameKey, QueryTypeDescriptorImpl> typesByName = new ConcurrentHashMap8<>(); + private final ConcurrentMap<QueryTypeNameKey, QueryTypeDescriptorImpl> typesByName = new ConcurrentHashMap<>(); /** */ private final GridQueryIndexing idx; - /** */ - private GridTimeoutProcessor.CancelableTask qryDetailMetricsEvictTask; + /** All indexes. */ + private final ConcurrentMap<QueryIndexKey, QueryIndexDescriptorImpl> idxs = new ConcurrentHashMap<>(); - /** */ - private static final ThreadLocal<AffinityTopologyVersion> requestTopVer = new ThreadLocal<>(); + /** Schema operation futures created on client side. */ + private final ConcurrentMap<UUID, SchemaOperationClientFuture> schemaCliFuts = new ConcurrentHashMap<>(); + + /** IO message listener. */ + private final GridMessageListener ioLsnr; + + /** Schema operations. */ + private final ConcurrentHashMap<SchemaKey, SchemaOperation> schemaOps = new ConcurrentHashMap<>(); + + /** Active propose messages. */ + private final LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> activeProposals = new LinkedHashMap<>(); + + /** General state mutex. */ + private final Object stateMux = new Object(); + + /** Coordinator node (initialized lazily). */ + private ClusterNode crd; + + /** Registered spaces. */ + private final Collection<String> spaces = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); + + /** ID history for index create/drop discovery messages. */ + private final GridBoundedConcurrentLinkedHashSet<IgniteUuid> dscoMsgIdHist = + new GridBoundedConcurrentLinkedHashSet<>(QueryUtils.discoveryHistorySize()); + + /** History of already completed operations. */ + private final GridBoundedConcurrentLinkedHashSet<UUID> completedOpIds = + new GridBoundedConcurrentLinkedHashSet<>(QueryUtils.discoveryHistorySize()); + + /** Pending status messages. */ + private final LinkedList<SchemaOperationStatusMessage> pendingMsgs = new LinkedList<>(); + + /** Disconnected flag. */ + private boolean disconnected; + + /** Whether exchange thread is ready to process further requests. */ + private boolean exchangeReady; /** */ private boolean skipFieldLookup; @@ -119,6 +202,20 @@ public class GridQueryProcessor extends GridProcessorAdapter { } else idx = INDEXING.inClassPath() ? U.<GridQueryIndexing>newInstance(INDEXING.className()) : null; + + ioLsnr = new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + if (msg instanceof SchemaOperationStatusMessage) { + SchemaOperationStatusMessage msg0 = (SchemaOperationStatusMessage)msg; + + msg0.senderNodeId(nodeId); + + processStatusMessage(msg0); + } + else + U.warn(log, "Unsupported IO message: " + msg); + } + }; } /** {@inheritDoc} */ @@ -131,6 +228,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { idx.start(ctx, busyLock); } + ctx.io().addMessageListener(TOPIC_SCHEMA, ioLsnr); + // Schedule queries detail metrics eviction. qryDetailMetricsEvictTask = ctx.timeout().schedule(new Runnable() { @Override public void run() { @@ -140,6 +239,401 @@ public class GridQueryProcessor extends GridProcessorAdapter { }, QRY_DETAIL_METRICS_EVICTION_FREQ, QRY_DETAIL_METRICS_EVICTION_FREQ); } + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + super.onKernalStop(cancel); + + if (cancel && idx != null) { + try { + while (!busyLock.tryBlock(500)) + idx.cancelAllQueries(); + + return; + } catch (InterruptedException ignored) { + U.warn(log, "Interrupted while waiting for active queries cancellation."); + + Thread.currentThread().interrupt(); + } + } + + busyLock.block(); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + super.stop(cancel); + + ctx.io().removeMessageListener(TOPIC_SCHEMA, ioLsnr); + + if (idx != null) + idx.stop(); + + U.closeQuiet(qryDetailMetricsEvictTask); + } + + /** + * Handle cache kernal start. At this point discovery and IO managers are operational, caches are not started yet. + * + * @throws IgniteCheckedException If failed. + */ + public void onCacheKernalStart() throws IgniteCheckedException { + synchronized (stateMux) { + exchangeReady = true; + + // Re-run pending top-level proposals. + for (SchemaOperation schemaOp : schemaOps.values()) + onSchemaPropose(schemaOp.proposeMessage()); + } + } + + /** + * Handle cache reconnect. + * + * @throws IgniteCheckedException If failed. + */ + public void onCacheReconnect() throws IgniteCheckedException { + synchronized (stateMux) { + assert disconnected; + + disconnected = false; + + onCacheKernalStart(); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { + return DiscoveryDataExchangeType.QUERY_PROC; + } + + /** {@inheritDoc} */ + @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { + // Collect active proposals. + synchronized (stateMux) { + LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> data = new LinkedHashMap<>(activeProposals); + + dataBag.addGridCommonData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), data); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { + synchronized (stateMux) { + // Preserve proposals. + LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> data0 = + (LinkedHashMap<UUID, SchemaProposeDiscoveryMessage>)data.commonData(); + + // Process proposals as if they were received as regular discovery messages. + if (data0 != null) { + for (SchemaProposeDiscoveryMessage activeProposal : data0.values()) + onSchemaProposeDiscovery0(activeProposal); + } + } + } + + /** + * Process schema propose message from discovery thread. + * + * @param msg Message. + * @return {@code True} if exchange should be triggered. + */ + private boolean onSchemaProposeDiscovery(SchemaProposeDiscoveryMessage msg) { + UUID opId = msg.operation().id(); + String space = msg.operation().space(); + + if (!msg.initialized()) { + // Ensure cache exists on coordinator node. + DynamicCacheDescriptor cacheDesc = ctx.cache().cacheDescriptor(space); + + if (cacheDesc == null) { + if (log.isDebugEnabled()) + log.debug("Received schema propose discovery message, but cache doesn't exist " + + "(will report error) [opId=" + opId + ", msg=" + msg + ']'); + + msg.onError(new SchemaOperationException(SchemaOperationException.CODE_CACHE_NOT_FOUND, space)); + } + else { + CacheConfiguration ccfg = cacheDesc.cacheConfiguration(); + + if (ccfg.getCacheMode() == CacheMode.LOCAL) { + // Distributed operation is not allowed on LOCAL caches. + if (log.isDebugEnabled()) + log.debug("Received schema propose discovery message, but cache is LOCAL " + + "(will report error) [opId=" + opId + ", msg=" + msg + ']'); + + msg.onError(new SchemaOperationException("Schema changes are not supported for LOCAL cache.")); + } + else { + // Preserve deployment ID so that we can distinguish between different caches with the same name. + if (msg.deploymentId() == null) + msg.deploymentId(cacheDesc.deploymentId()); + + assert F.eq(cacheDesc.deploymentId(), msg.deploymentId()); + } + } + } + + // Complete client future and exit immediately in case of error. + if (msg.hasError()) { + SchemaOperationClientFuture cliFut = schemaCliFuts.remove(opId); + + if (cliFut != null) + cliFut.onDone(msg.error()); + + return false; + } + + return onSchemaProposeDiscovery0(msg); + } + + /** + * Process schema propose message from discovery thread (or from cache start routine). + * + * @param msg Message. + * @return {@code True} if exchange should be triggered. + */ + private boolean onSchemaProposeDiscovery0(SchemaProposeDiscoveryMessage msg) { + UUID opId = msg.operation().id(); + + synchronized (stateMux) { + if (disconnected) { + if (log.isDebugEnabled()) + log.debug("Processing discovery schema propose message, but node is disconnected (will ignore) " + + "[opId=" + opId + ", msg=" + msg + ']'); + + return false; + } + + if (log.isDebugEnabled()) + log.debug("Processing discovery schema propose message [opId=" + opId + ", msg=" + msg + ']'); + + // Put message to active operations set. + SchemaProposeDiscoveryMessage oldDesc = activeProposals.put(msg.operation().id(), msg); + + assert oldDesc == null; + + // Create schema operation and either trigger it immediately from exchange thread or append to already + // running operation. + SchemaOperation schemaOp = new SchemaOperation(msg); + + SchemaKey key = msg.schemaKey(); + + SchemaOperation prevSchemaOp = schemaOps.get(key); + + if (prevSchemaOp != null) { + prevSchemaOp = prevSchemaOp.unwind(); + + if (log.isDebugEnabled()) + log.debug("Schema change is enqueued and will be executed after previous operation is completed " + + "[opId=" + opId + ", prevOpId=" + prevSchemaOp.id() + ']'); + + prevSchemaOp.next(schemaOp); + + return false; + } + else { + schemaOps.put(key, schemaOp); + + return exchangeReady; + } + } + } + + /** + * Handle schema propose from exchange thread. + * + * @param msg Discovery message. + */ + @SuppressWarnings("ThrowableInstanceNeverThrown") + public void onSchemaPropose(SchemaProposeDiscoveryMessage msg) { + UUID opId = msg.operation().id(); + + if (log.isDebugEnabled()) + log.debug("Processing schema propose message (exchange) [opId=" + opId + ']'); + + synchronized (stateMux) { + if (disconnected) + return; + + SchemaOperation curOp = schemaOps.get(msg.schemaKey()); + + assert curOp != null; + assert F.eq(opId, curOp.id()); + assert !curOp.started(); + + startSchemaChange(curOp); + } + } + + /** + * Process schema finish message from discovery thread. + * + * @param msg Message. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + private void onSchemaFinishDiscovery(SchemaFinishDiscoveryMessage msg) { + UUID opId = msg.operation().id(); + + if (log.isDebugEnabled()) + log.debug("Received schema finish message (discovery) [opId=" + opId + ", msg=" + msg + ']'); + + synchronized (stateMux) { + if (disconnected) + return; + + boolean completedOpAdded = completedOpIds.add(opId); + + assert completedOpAdded; + + // Remove propose message so that it will not be shared with joining nodes. + SchemaProposeDiscoveryMessage proposeMsg = activeProposals.remove(opId); + + assert proposeMsg != null; + + // Apply changes to public cache schema if operation is successful and original cache is still there. + if (!msg.hasError()) { + DynamicCacheDescriptor cacheDesc = ctx.cache().cacheDescriptor(msg.operation().space()); + + if (cacheDesc != null && F.eq(cacheDesc.deploymentId(), proposeMsg.deploymentId())) + cacheDesc.schemaChangeFinish(msg); + } + + // Propose message will be used from exchange thread to + msg.proposeMessage(proposeMsg); + + if (exchangeReady) { + SchemaOperation op = schemaOps.get(proposeMsg.schemaKey()); + + if (F.eq(op.id(), opId)) { + // Completed top operation. + op.finishMessage(msg); + + if (op.started()) + op.doFinish(); + } + else { + // Completed operation in the middle, will schedule completion later. + while (op != null) { + if (F.eq(op.id(), opId)) + break; + + op = op.next(); + } + + assert op != null; + assert !op.started(); + + op.finishMessage(msg); + } + } + else { + // Set next operation as top-level one. + SchemaKey schemaKey = proposeMsg.schemaKey(); + + SchemaOperation op = schemaOps.remove(schemaKey); + + assert op != null; + assert F.eq(op.id(), opId); + + // Chain to the next operation (if any). + SchemaOperation nextOp = op.next(); + + if (nextOp != null) + schemaOps.put(schemaKey, nextOp); + } + + // Clean stale IO messages from just-joined nodes. + cleanStaleStatusMessages(opId); + } + + // Complete client future (if any). + SchemaOperationClientFuture cliFut = schemaCliFuts.remove(opId); + + if (cliFut != null) { + if (msg.hasError()) + cliFut.onDone(msg.error()); + else + cliFut.onDone(); + } + } + + /** + * Initiate actual schema change operation. + * + * @param schemaOp Schema operation. + */ + @SuppressWarnings({"unchecked", "ThrowableInstanceNeverThrown"}) + private void startSchemaChange(SchemaOperation schemaOp) { + assert Thread.holdsLock(stateMux); + assert !schemaOp.started(); + + // Get current cache state. + SchemaProposeDiscoveryMessage msg = schemaOp.proposeMessage(); + + String space = msg.operation().space(); + + DynamicCacheDescriptor cacheDesc = ctx.cache().cacheDescriptor(space); + + boolean cacheExists = cacheDesc != null && F.eq(msg.deploymentId(), cacheDesc.deploymentId()); + + boolean cacheRegistered = cacheExists && spaces.contains(CU.mask(space)); + + // Validate schema state and decide whether we should proceed or not. + SchemaAbstractOperation op = msg.operation(); + + QueryTypeDescriptorImpl type = null; + SchemaOperationException err; + + boolean nop = false; + + if (cacheExists) { + if (cacheRegistered) { + // If cache is started, we perform validation against real schema. + T3<QueryTypeDescriptorImpl, Boolean, SchemaOperationException> res = prepareChangeOnStartedCache(op); + + assert res.get2() != null; + + type = res.get1(); + nop = res.get2(); + err = res.get3(); + } + else { + // If cache is not started yet, there is no schema. Take schema from cache descriptor and validate. + QuerySchema schema = cacheDesc.schema(); + + T2<Boolean, SchemaOperationException> res = prepareChangeOnNotStartedCache(op, schema); + + assert res.get1() != null; + + type = null; + nop = res.get1(); + err = res.get2(); + } + } + else + err = new SchemaOperationException(SchemaOperationException.CODE_CACHE_NOT_FOUND, op.space()); + + // Start operation. + SchemaOperationWorker worker = + new SchemaOperationWorker(ctx, this, msg.deploymentId(), op, nop, err, cacheRegistered, type); + + SchemaOperationManager mgr = new SchemaOperationManager(ctx, this, worker, + ctx.clientNode() ? null : coordinator()); + + schemaOp.manager(mgr); + + mgr.start(); + + // Unwind pending IO messages. + if (!ctx.clientNode() && coordinator().isLocal()) + unwindPendingMessages(schemaOp.id(), mgr); + + // Schedule operation finish handling if needed. + if (schemaOp.hasFinishMessage()) + schemaOp.doFinish(); + } + /** * @return {@code true} If indexing module is in classpath and successfully initialized. */ @@ -148,55 +642,115 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * @return Indexing. + * @throws IgniteException If module is not enabled. + */ + public GridQueryIndexing getIndexing() throws IgniteException { + checkxEnabled(); + + return idx; + } + + /** * @param cctx Cache context. + * @param schema Initial schema. * @throws IgniteCheckedException If failed. */ - @SuppressWarnings("deprecation") - private void initializeCache(GridCacheContext<?, ?> cctx) throws IgniteCheckedException { + @SuppressWarnings({"deprecation", "ThrowableResultOfMethodCallIgnored"}) + private void initializeCache(GridCacheContext<?, ?> cctx, QuerySchema schema) throws IgniteCheckedException { String space = cctx.name(); - CacheConfiguration<?,?> ccfg = cctx.config(); - // Prepare candidates. List<Class<?>> mustDeserializeClss = new ArrayList<>(); Collection<QueryTypeCandidate> cands = new ArrayList<>(); - if (!F.isEmpty(ccfg.getQueryEntities())) { - for (QueryEntity qryEntity : ccfg.getQueryEntities()) { + Collection<QueryEntity> qryEntities = schema.entities(); + + if (!F.isEmpty(qryEntities)) { + for (QueryEntity qryEntity : qryEntities) { QueryTypeCandidate cand = QueryUtils.typeForQueryEntity(space, cctx, qryEntity, mustDeserializeClss); cands.add(cand); } } - // Register candidates. - idx.registerCache(space, cctx, cctx.config()); + // Ensure that candidates has unique index names. Otherwise we will not be able to apply pending operations. + Map<String, QueryTypeDescriptorImpl> tblTypMap = new HashMap<>(); + Map<String, QueryTypeDescriptorImpl> idxTypMap = new HashMap<>(); - try { - for (QueryTypeCandidate cand : cands) { - QueryTypeIdKey typeId = cand.typeId(); - QueryTypeIdKey altTypeId = cand.alternativeTypeId(); - QueryTypeDescriptorImpl desc = cand.descriptor(); + for (QueryTypeCandidate cand : cands) { + QueryTypeDescriptorImpl desc = cand.descriptor(); - if (typesByName.putIfAbsent(new QueryTypeNameKey(space, desc.name()), desc) != null) - throw new IgniteCheckedException("Type with name '" + desc.name() + "' already indexed " + - "in cache '" + space + "'."); + QueryTypeDescriptorImpl oldDesc = tblTypMap.put(desc.tableName(), desc); - types.put(typeId, desc); + if (oldDesc != null) + throw new IgniteException("Duplicate table name [tblName=" + desc.tableName() + + ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']'); - if (altTypeId != null) - types.put(altTypeId, desc); + for (String idxName : desc.indexes().keySet()) { + oldDesc = idxTypMap.put(idxName, desc); - idx.registerType(space, desc); + if (oldDesc != null) + throw new IgniteException("Duplicate index name [idxName=" + idxName + + ", type1=" + desc.name() + ", type2=" + oldDesc.name() + ']'); } } - catch (IgniteCheckedException | RuntimeException e) { - unregisterCache0(space); - throw e; + // Apply pending operation which could have been completed as no-op at this point. There could be only one + // in-flight operation for a cache. + synchronized (stateMux) { + if (disconnected) + return; + + for (SchemaOperation op : schemaOps.values()) { + if (F.eq(op.proposeMessage().deploymentId(), cctx.dynamicDeploymentId())) { + if (op.started()) { + SchemaOperationWorker worker = op.manager().worker(); + + assert !worker.cacheRegistered(); + + if (!worker.nop()) { + IgniteInternalFuture fut = worker.future(); + + assert fut.isDone(); + + if (fut.error() == null) { + SchemaAbstractOperation op0 = op.proposeMessage().operation(); + + if (op0 instanceof SchemaIndexCreateOperation) { + SchemaIndexCreateOperation opCreate = (SchemaIndexCreateOperation)op0; + + QueryTypeDescriptorImpl typeDesc = tblTypMap.get(opCreate.tableName()); + + assert typeDesc != null; + + QueryUtils.processDynamicIndexChange(opCreate.indexName(), opCreate.index(), + typeDesc); + } + else if (op0 instanceof SchemaIndexDropOperation) { + SchemaIndexDropOperation opDrop = (SchemaIndexDropOperation)op0; + + QueryTypeDescriptorImpl typeDesc = idxTypMap.get(opDrop.indexName()); + + assert typeDesc != null; + + QueryUtils.processDynamicIndexChange(opDrop.indexName(), null, typeDesc); + } + else + assert false; + } + } + } + + break; + } + } } + // Ready to register at this point. + registerCache0(space, cctx, cands); + // Warn about possible implicit deserialization. if (!mustDeserializeClss.isEmpty()) { U.warn(log, "Some classes in query configuration cannot be written in binary format " + @@ -209,46 +763,41 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - super.onKernalStop(cancel); - - if (cancel && idx != null) - try { - while (!busyLock.tryBlock(500)) - idx.cancelAllQueries(); - - return; - } - catch (InterruptedException ignored) { - U.warn(log, "Interrupted while waiting for active queries cancellation."); + @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { + Collection<SchemaOperationClientFuture> futs; - Thread.currentThread().interrupt(); - } + synchronized (stateMux) { + disconnected = true; + exchangeReady = false; - busyLock.block(); - } + // Clear client futures. + futs = new ArrayList<>(schemaCliFuts.values()); - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - super.stop(cancel); + schemaCliFuts.clear(); - if (idx != null) - idx.stop(); + // Clear operations data. + activeProposals.clear(); + schemaOps.clear(); + } - U.closeQuiet(qryDetailMetricsEvictTask); - } + // Complete client futures outside of synchonized block because they may have listeners/chains. + for (SchemaOperationClientFuture fut : futs) + fut.onDone(new SchemaOperationException("Client node is disconnected (operation result is unknown).")); - /** {@inheritDoc} */ - @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { if (idx != null) idx.onDisconnected(reconnectFut); } /** + * Handle cache start. Invoked either from GridCacheProcessor.onKernalStart() method or from exchange worker. + * When called for the first time, we initialize topology thus understanding whether current node is coordinator + * or not. + * * @param cctx Cache context. + * @param schema Index states. * @throws IgniteCheckedException If failed. */ - public void onCacheStart(GridCacheContext cctx) throws IgniteCheckedException { + public void onCacheStart(GridCacheContext cctx, QuerySchema schema) throws IgniteCheckedException { if (idx == null) return; @@ -258,45 +807,506 @@ public class GridQueryProcessor extends GridProcessorAdapter { cctx.shared().database().checkpointReadLock(); try { - initializeCache(cctx); + initializeCache(cctx, schema); } finally { cctx.shared().database().checkpointReadUnlock(); - busyLock.leaveBusy(); + busyLock.leaveBusy(); + } + } + + /** + * @param cctx Cache context. + */ + public void onCacheStop(GridCacheContext cctx) { + if (idx == null) + return; + + if (!busyLock.enterBusy()) + return; + + try { + unregisterCache0(cctx.name()); + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @return Skip field lookup flag. + */ + public boolean skipFieldLookup() { + return skipFieldLookup; + } + + /** + * @param skipFieldLookup Skip field lookup flag. + */ + public void skipFieldLookup(boolean skipFieldLookup) { + this.skipFieldLookup = skipFieldLookup; + } + + /** + * Handle custom discovery message. + * + * @param msg Message. + */ + public void onDiscovery(SchemaAbstractDiscoveryMessage msg) { + IgniteUuid id = msg.id(); + + if (!dscoMsgIdHist.add(id)) { + U.warn(log, "Received duplicate schema custom discovery message (will ignore) [opId=" + + msg.operation().id() + ", msg=" + msg +']'); + + return; + } + + if (msg instanceof SchemaProposeDiscoveryMessage) { + SchemaProposeDiscoveryMessage msg0 = (SchemaProposeDiscoveryMessage)msg; + + boolean exchange = onSchemaProposeDiscovery(msg0); + + msg0.exchange(exchange); + } + else if (msg instanceof SchemaFinishDiscoveryMessage) { + SchemaFinishDiscoveryMessage msg0 = (SchemaFinishDiscoveryMessage)msg; + + onSchemaFinishDiscovery(msg0); + } + else + U.warn(log, "Received unsupported schema custom discovery message (will ignore) [opId=" + + msg.operation().id() + ", msg=" + msg +']'); + } + + /** + * Prepare change on started cache. + * + * @param op Operation. + * @return Result: affected type, nop flag, error. + */ + private T3<QueryTypeDescriptorImpl, Boolean, SchemaOperationException> prepareChangeOnStartedCache( + SchemaAbstractOperation op) { + QueryTypeDescriptorImpl type = null; + boolean nop = false; + SchemaOperationException err = null; + + String space = op.space(); + + if (op instanceof SchemaIndexCreateOperation) { + SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation) op; + + QueryIndex idx = op0.index(); + + // Make sure table exists. + String tblName = op0.tableName(); + + type = type(space, tblName); + + if (type == null) + err = new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, tblName); + else { + // Make sure that index can be applied to the given table. + for (String idxField : idx.getFieldNames()) { + if (!type.fields().containsKey(idxField)) { + err = new SchemaOperationException(SchemaOperationException.CODE_COLUMN_NOT_FOUND, + idxField); + + break; + } + } + } + + // Check conflict with other indexes. + if (err == null) { + String idxName = op0.index().getName(); + + QueryIndexKey idxKey = new QueryIndexKey(space, idxName); + + if (idxs.get(idxKey) != null) { + if (op0.ifNotExists()) + nop = true; + else + err = new SchemaOperationException(SchemaOperationException.CODE_INDEX_EXISTS, idxName); + } + } + } + else if (op instanceof SchemaIndexDropOperation) { + SchemaIndexDropOperation op0 = (SchemaIndexDropOperation) op; + + String idxName = op0.indexName(); + + QueryIndexDescriptorImpl oldIdx = idxs.get(new QueryIndexKey(space, idxName)); + + if (oldIdx == null) { + if (op0.ifExists()) + nop = true; + else + err = new SchemaOperationException(SchemaOperationException.CODE_INDEX_NOT_FOUND, idxName); + } + else + type = oldIdx.typeDescriptor(); + } + else + err = new SchemaOperationException("Unsupported operation: " + op); + + return new T3<>(type, nop, err); + } + + /** + * Prepare operation on non-started cache. + * + * @param op Operation. + * @param schema Known cache schema. + * @return Result: nop flag, error. + */ + private T2<Boolean, SchemaOperationException> prepareChangeOnNotStartedCache(SchemaAbstractOperation op, + QuerySchema schema) { + boolean nop = false; + SchemaOperationException err = null; + + // Build table and index maps. + Map<String, QueryEntity> tblMap = new HashMap<>(); + Map<String, T2<QueryEntity, QueryIndex>> idxMap = new HashMap<>(); + + for (QueryEntity entity : schema.entities()) { + String tblName = QueryUtils.tableName(entity); + + QueryEntity oldEntity = tblMap.put(tblName, entity); + + if (oldEntity != null) { + err = new SchemaOperationException("Invalid schema state (duplicate table found): " + tblName); + + break; + } + + for (QueryIndex entityIdx : entity.getIndexes()) { + String idxName = QueryUtils.indexName(entity, entityIdx); + + T2<QueryEntity, QueryIndex> oldIdxEntity = idxMap.put(idxName, new T2<>(entity, entityIdx)); + + if (oldIdxEntity != null) { + err = new SchemaOperationException("Invalid schema state (duplicate index found): " + + idxName); + + break; + } + } + + if (err != null) + break; + } + + // Now check whether operation can be applied to schema. + if (op instanceof SchemaIndexCreateOperation) { + SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op; + + String idxName = op0.indexName(); + + T2<QueryEntity, QueryIndex> oldIdxEntity = idxMap.get(idxName); + + if (oldIdxEntity == null) { + String tblName = op0.tableName(); + + QueryEntity oldEntity = tblMap.get(tblName); + + if (oldEntity == null) + err = new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, tblName); + else { + for (String fieldName : op0.index().getFields().keySet()) { + Set<String> oldEntityFields = new HashSet<>(oldEntity.getFields().keySet()); + + for (Map.Entry<String, String> alias : oldEntity.getAliases().entrySet()) { + oldEntityFields.remove(alias.getKey()); + oldEntityFields.add(alias.getValue()); + } + + if (!oldEntityFields.contains(fieldName)) { + err = new SchemaOperationException(SchemaOperationException.CODE_COLUMN_NOT_FOUND, + fieldName); + + break; + } + } + } + } + else { + if (op0.ifNotExists()) + nop = true; + else + err = new SchemaOperationException(SchemaOperationException.CODE_INDEX_EXISTS, idxName); + } + } + else if (op instanceof SchemaIndexDropOperation) { + SchemaIndexDropOperation op0 = (SchemaIndexDropOperation)op; + + String idxName = op0.indexName(); + + T2<QueryEntity, QueryIndex> oldIdxEntity = idxMap.get(idxName); + + if (oldIdxEntity == null) { + if (op0.ifExists()) + nop = true; + else + err = new SchemaOperationException(SchemaOperationException.CODE_INDEX_NOT_FOUND, idxName); + } + } + else + err = new SchemaOperationException("Unsupported operation: " + op); + + return new T2<>(nop, err); + } + + /** + * Invoked when coordinator finished ensuring that all participants are ready. + * + * @param op Operation. + * @param err Error (if any). + */ + public void onCoordinatorFinished(SchemaAbstractOperation op, @Nullable SchemaOperationException err) { + synchronized (stateMux) { + SchemaFinishDiscoveryMessage msg = new SchemaFinishDiscoveryMessage(op, err); + + try { + ctx.discovery().sendCustomEvent(msg); + } + catch (Exception e) { + // Failed to send finish message over discovery. This is something unrecoverable. + U.warn(log, "Failed to send schema finish discovery message [opId=" + op.id() + ']', e); + } + } + } + + /** + * Get current coordinator node. + * + * @return Coordinator node. + */ + private ClusterNode coordinator() { + assert !ctx.clientNode(); + + synchronized (stateMux) { + if (crd == null) { + ClusterNode crd0 = null; + + for (ClusterNode node : ctx.discovery().aliveServerNodes()) { + if (crd0 == null || crd0.order() > node.order()) + crd0 = node; + } + + assert crd0 != null; + + crd = crd0; + } + + return crd; + } + } + + /** + * Get rid of stale IO message received from other nodes which joined when operation had been in progress. + * + * @param opId Operation ID. + */ + private void cleanStaleStatusMessages(UUID opId) { + Iterator<SchemaOperationStatusMessage> it = pendingMsgs.iterator(); + + while (it.hasNext()) { + SchemaOperationStatusMessage statusMsg = it.next(); + + if (F.eq(opId, statusMsg.operationId())) { + it.remove(); + + if (log.isDebugEnabled()) + log.debug("Dropped operation status message because it is already completed [opId=" + opId + + ", rmtNode=" + statusMsg.senderNodeId() + ']'); + } + } + } + + /** + * Apply positive index operation result. + * + * @param op Operation. + * @param type Type descriptor (if available), + */ + public void onLocalOperationFinished(SchemaAbstractOperation op, @Nullable QueryTypeDescriptorImpl type) { + synchronized (stateMux) { + if (disconnected) + return; + + // No need to apply anything to obsolete type. + if (type == null || type.obsolete()) { + if (log.isDebugEnabled()) + log.debug("Local operation finished, but type descriptor is either missing or obsolete " + + "(will ignore) [opId=" + op.id() + ']'); + + return; + } + + if (log.isDebugEnabled()) + log.debug("Local operation finished successfully [opId=" + op.id() + ']'); + + try { + if (op instanceof SchemaIndexCreateOperation) { + SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op; + + QueryUtils.processDynamicIndexChange(op0.indexName(), op0.index(), type); + + QueryIndexDescriptorImpl idxDesc = type.index(op0.indexName()); + + QueryIndexKey idxKey = new QueryIndexKey(op.space(), op0.indexName()); + + idxs.put(idxKey, idxDesc); + } + else { + assert op instanceof SchemaIndexDropOperation; + + SchemaIndexDropOperation op0 = (SchemaIndexDropOperation) op; + + QueryUtils.processDynamicIndexChange(op0.indexName(), null, type); + + QueryIndexKey idxKey = new QueryIndexKey(op.space(), op0.indexName()); + + idxs.remove(idxKey); + } + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to finish index operation [opId=" + op.id() + " op=" + op + ']', e); + } + } + } + + /** + * Handle node leave. + * + * @param node Node. + */ + public void onNodeLeave(ClusterNode node) { + synchronized (stateMux) { + // Clients do not send status messages and are never coordinators. + if (ctx.clientNode()) + return; + + ClusterNode crd0 = coordinator(); + + if (F.eq(node.id(), crd0.id())) { + crd = null; + + crd0 = coordinator(); + } + + for (SchemaOperation op : schemaOps.values()) { + if (op.started()) { + op.manager().onNodeLeave(node.id(), crd0); + + if (crd0.isLocal()) + unwindPendingMessages(op.id(), op.manager()); + } + } + } + } + + /** + * Process index operation. + * + * @param op Operation. + * @param type Type descriptor. + * @param depId Cache deployment ID. + * @param cancelTok Cancel token. + * @throws SchemaOperationException If failed. + */ + public void processIndexOperationLocal(SchemaAbstractOperation op, QueryTypeDescriptorImpl type, IgniteUuid depId, + SchemaIndexOperationCancellationToken cancelTok) throws SchemaOperationException { + if (log.isDebugEnabled()) + log.debug("Started local index operation [opId=" + op.id() + ']'); + + String space = op.space(); + + GridCacheAdapter cache = ctx.cache().internalCache(op.space()); + + if (cache == null || !F.eq(depId, cache.context().dynamicDeploymentId())) + throw new SchemaOperationException(SchemaOperationException.CODE_CACHE_NOT_FOUND, op.space()); + + try { + if (op instanceof SchemaIndexCreateOperation) { + SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation) op; + + QueryIndexDescriptorImpl idxDesc = QueryUtils.createIndexDescriptor(type, op0.index()); + + SchemaIndexCacheVisitor visitor = + new SchemaIndexCacheVisitorImpl(this, cache.context(), space, op0.tableName(), cancelTok); + + idx.dynamicIndexCreate(space, op0.tableName(), idxDesc, op0.ifNotExists(), visitor); + } + else if (op instanceof SchemaIndexDropOperation) { + SchemaIndexDropOperation op0 = (SchemaIndexDropOperation) op; + + idx.dynamicIndexDrop(space, op0.indexName(), op0.ifExists()); + } + else + throw new SchemaOperationException("Unsupported operation: " + op); + } + catch (Exception e) { + if (e instanceof SchemaOperationException) + throw (SchemaOperationException)e; + else + throw new SchemaOperationException("Schema change operation failed: " + e.getMessage(), e); } } /** + * Register cache in indexing SPI. + * + * @param space Space. * @param cctx Cache context. + * @param cands Candidates. + * @throws IgniteCheckedException If failed. */ - public void onCacheStop(GridCacheContext cctx) { - if (idx == null) - return; + private void registerCache0(String space, GridCacheContext<?, ?> cctx, Collection<QueryTypeCandidate> cands) + throws IgniteCheckedException { + synchronized (stateMux) { + idx.registerCache(space, cctx, cctx.config()); - if (!busyLock.enterBusy()) - return; + try { + for (QueryTypeCandidate cand : cands) { + QueryTypeIdKey typeId = cand.typeId(); + QueryTypeIdKey altTypeId = cand.alternativeTypeId(); + QueryTypeDescriptorImpl desc = cand.descriptor(); - try { - unregisterCache0(cctx.name()); - } - finally { - busyLock.leaveBusy(); - } - } + if (typesByName.putIfAbsent(new QueryTypeNameKey(space, desc.name()), desc) != null) + throw new IgniteCheckedException("Type with name '" + desc.name() + "' already indexed " + + "in cache '" + space + "'."); - /** - * @return Skip field lookup flag. - */ - public boolean skipFieldLookup() { - return skipFieldLookup; - } + types.put(typeId, desc); - /** - * @param skipFieldLookup Skip field lookup flag. - */ - public void skipFieldLookup(boolean skipFieldLookup) { - this.skipFieldLookup = skipFieldLookup; + if (altTypeId != null) + types.put(altTypeId, desc); + + for (QueryIndexDescriptorImpl idx : desc.indexes0()) { + QueryIndexKey idxKey = new QueryIndexKey(space, idx.name()); + + QueryIndexDescriptorImpl oldIdx = idxs.putIfAbsent(idxKey, idx); + + if (oldIdx != null) { + throw new IgniteException("Duplicate index name [space=" + space + + ", idxName=" + idx.name() + ", existingTable=" + oldIdx.typeDescriptor().tableName() + + ", table=" + desc.tableName() + ']'); + } + } + + idx.registerType(space, desc); + } + + spaces.add(CU.mask(space)); + } + catch (IgniteCheckedException | RuntimeException e) { + unregisterCache0(space); + + throw e; + } + } } /** @@ -307,13 +1317,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { private void unregisterCache0(String space) { assert idx != null; - try { - idx.unregisterCache(space); - } - catch (Exception e) { - U.error(log, "Failed to clear indexing on cache unregister (will ignore): " + space, e); - } - finally { + synchronized (stateMux) { + // Clear types. Iterator<Map.Entry<QueryTypeIdKey, QueryTypeDescriptorImpl>> it = types.entrySet().iterator(); while (it.hasNext()) { @@ -323,9 +1328,79 @@ public class GridQueryProcessor extends GridProcessorAdapter { it.remove(); typesByName.remove(new QueryTypeNameKey(space, entry.getValue().name())); + + entry.getValue().markObsolete(); } } + + // Clear indexes. + Iterator<Map.Entry<QueryIndexKey, QueryIndexDescriptorImpl>> idxIt = idxs.entrySet().iterator(); + + while (idxIt.hasNext()) { + Map.Entry<QueryIndexKey, QueryIndexDescriptorImpl> idxEntry = idxIt.next(); + + QueryIndexKey idxKey = idxEntry.getKey(); + + if (F.eq(space, idxKey.space())) + idxIt.remove(); + } + + // Notify in-progress index operations. + for (SchemaOperation op : schemaOps.values()) { + if (op.started()) + op.manager().worker().cancel(); + } + + // Notify indexing. + try { + idx.unregisterCache(space); + } + catch (Exception e) { + U.error(log, "Failed to clear indexing on cache unregister (will ignore): " + space, e); + } + + spaces.remove(CU.mask(space)); + } + } + + /** + * Check whether provided key and value belongs to expected space and table. + * + * @param cctx Target cache context. + * @param expSpace Expected space. + * @param expTblName Expected table name. + * @param key Key. + * @param val Value. + * @return {@code True} if this key-value pair belongs to expected space/table, {@code false} otherwise or + * if space or table doesn't exist. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("ConstantConditions") + public boolean belongsToTable(GridCacheContext cctx, String expSpace, String expTblName, KeyCacheObject key, + CacheObject val) throws IgniteCheckedException { + QueryTypeDescriptorImpl desc = type(expSpace, val); + + if (desc == null) + return false; + + if (!F.eq(expTblName, desc.tableName())) + return false; + + if (!cctx.cacheObjects().isBinaryObject(val)) { + Class<?> valCls = val.value(cctx.cacheObjectContext(), false).getClass(); + + if (!desc.valueClass().isAssignableFrom(valCls)) + return false; } + + if (!cctx.cacheObjects().isBinaryObject(key)) { + Class<?> keyCls = key.value(cctx.cacheObjectContext(), false).getClass(); + + if (!desc.keyClass().isAssignableFrom(keyCls)) + return false; + } + + return true; } /** @@ -457,7 +1532,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (desc == null) return; - idx.store(space, desc, key, partId, val, ver, expirationTime, link); + idx.store(space, desc.name(), key, partId, val, ver, expirationTime, link); } finally { busyLock.leaveBusy(); @@ -518,6 +1593,30 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * Gets type descriptor for space by given object's type. + * + * @param space Space name. + * @param val Object to determine type for. + * @return Type descriptor. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("ConstantConditions") + private QueryTypeDescriptorImpl type(@Nullable String space, CacheObject val) throws IgniteCheckedException { + CacheObjectContext coctx = cacheObjectContext(space); + + QueryTypeIdKey id; + + boolean binaryVal = ctx.cacheObjects().isBinaryObject(val); + + if (binaryVal) + id = new QueryTypeIdKey(space, ctx.cacheObjects().typeId(val)); + else + id = new QueryTypeIdKey(space, val.value(coctx, false).getClass()); + + return types.get(id); + } + + /** * @throws IgniteCheckedException If failed. */ private void checkEnabled() throws IgniteCheckedException { @@ -637,9 +1736,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException { String type = qry.getType(); - QueryTypeDescriptorImpl typeDesc = type(cctx.name(), type); + String typeName = typeName(cctx.name(), type); - qry.setType(typeDesc.name()); + qry.setType(typeName); sendQueryExecutedEvent( qry.getSql(), @@ -682,6 +1781,80 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * Entry point for index procedure. + * + * @param space Space name. + * @param tblName Table name. + * @param idx Index. + * @param ifNotExists When set to {@code true} operation will fail if index already exists. + * @return Future completed when index is created. + */ + public IgniteInternalFuture<?> dynamicIndexCreate(String space, String tblName, QueryIndex idx, + boolean ifNotExists) { + SchemaAbstractOperation op = new SchemaIndexCreateOperation(UUID.randomUUID(), space, tblName, idx, ifNotExists); + + return startIndexOperationDistributed(op); + } + + /** + * Entry point for index drop procedure + * + * @param idxName Index name. + * @param ifExists When set to {@code true} operation fill fail if index doesn't exists. + * @return Future completed when index is created. + */ + public IgniteInternalFuture<?> dynamicIndexDrop(String space, String idxName, boolean ifExists) { + SchemaAbstractOperation op = new SchemaIndexDropOperation(UUID.randomUUID(), space, idxName, ifExists); + + return startIndexOperationDistributed(op); + } + + /** + * Start distributed index change operation. + * + * @param op Operation. + * @return Future. + */ + private IgniteInternalFuture<?> startIndexOperationDistributed(SchemaAbstractOperation op) { + SchemaOperationClientFuture fut = new SchemaOperationClientFuture(op.id()); + + SchemaOperationClientFuture oldFut = schemaCliFuts.put(op.id(), fut); + + assert oldFut == null; + + try { + ctx.discovery().sendCustomEvent(new SchemaProposeDiscoveryMessage(op)); + + if (log.isDebugEnabled()) + log.debug("Sent schema propose discovery message [opId=" + op.id() + ", op=" + op + ']'); + + boolean disconnected0; + + synchronized (stateMux) { + disconnected0 = disconnected; + } + + if (disconnected0) { + fut.onDone(new SchemaOperationException("Client node is disconnected (operation result is unknown).")); + + schemaCliFuts.remove(op.id()); + } + } + catch (Exception e) { + if (e instanceof SchemaOperationException) + fut.onDone(e); + else { + fut.onDone(new SchemaOperationException("Failed to start schema change operation due to " + + "unexpected exception [opId=" + op.id() + ", op=" + op + ']', e)); + } + + schemaCliFuts.remove(op.id()); + } + + return fut; + } + + /** * @param sqlQry Sql query. * @param params Params. */ @@ -704,14 +1877,15 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * @param schema Schema. + * + * @param space Space name. * @param sql Query. * @return {@link PreparedStatement} from underlying engine to supply metadata to Prepared - most likely H2. */ - public PreparedStatement prepareNativeStatement(String schema, String sql) throws SQLException { + public PreparedStatement prepareNativeStatement(String space, String sql) throws SQLException { checkxEnabled(); - return idx.prepareNativeStatement(schema, sql); + return idx.prepareNativeStatement(space, sql); } /** @@ -838,9 +2012,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { return executeQuery(GridCacheQueryType.TEXT, clause, cctx, new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() { @Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException { - QueryTypeDescriptorImpl type = type(space, resType); + String typeName = typeName(space, resType); - return idx.queryLocalText(space, clause, type, filters); + return idx.queryLocalText(space, clause, typeName, filters); } }, true); } @@ -924,20 +2098,35 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * Gets type descriptor for space and type name. + * Get type descriptor for the given space and table name. + * @param space Space. + * @param tblName Table name. + * @return Type (if any). + */ + @Nullable private QueryTypeDescriptorImpl type(@Nullable String space, String tblName) { + for (QueryTypeDescriptorImpl type : types.values()) { + if (F.eq(space, type.space()) && F.eq(tblName, type.tableName())) + return type; + } + + return null; + } + + /** + * Gets type name for provided space and type name if type is still valid. * * @param space Space name. * @param typeName Type name. * @return Type descriptor. * @throws IgniteCheckedException If failed. */ - public QueryTypeDescriptorImpl type(@Nullable String space, String typeName) throws IgniteCheckedException { + private String typeName(@Nullable String space, String typeName) throws IgniteCheckedException { QueryTypeDescriptorImpl type = typesByName.get(new QueryTypeNameKey(space, typeName)); if (type == null) throw new IgniteCheckedException("Failed to find SQL table for type: " + typeName); - return type; + return type.name(); } /** @@ -972,7 +2161,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw (IgniteCheckedException)err; } - catch (CacheException e) { + catch (CacheException | IgniteException e) { err = e; throw e; @@ -998,6 +2187,146 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * Send status message to coordinator node. + * + * @param destNodeId Destination node ID. + * @param opId Operation ID. + * @param err Error. + */ + public void sendStatusMessage(UUID destNodeId, UUID opId, SchemaOperationException err) { + if (log.isDebugEnabled()) + log.debug("Sending schema operation status message [opId=" + opId + ", crdNode=" + destNodeId + + ", err=" + err + ']'); + + try { + byte[] errBytes = marshalSchemaError(opId, err); + + SchemaOperationStatusMessage msg = new SchemaOperationStatusMessage(opId, errBytes); + + // Messages must go to dedicated schema pool. We cannot push them to query pool because in this case + // they could be blocked with other query requests. + ctx.io().sendToGridTopic(destNodeId, TOPIC_SCHEMA, msg, SCHEMA_POOL); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send schema status response [opId=" + opId + ", destNodeId=" + destNodeId + + ", err=" + e + ']'); + } + } + + /** + * Process status message. + * + * @param msg Status message. + */ + private void processStatusMessage(SchemaOperationStatusMessage msg) { + synchronized (stateMux) { + if (completedOpIds.contains(msg.operationId())) { + // Received message from a node which joined topology in the middle of operation execution. + if (log.isDebugEnabled()) + log.debug("Received status message for completed operation (will ignore) [" + + "opId=" + msg.operationId() + ", sndNodeId=" + msg.senderNodeId() + ']'); + + return; + } + + UUID opId = msg.operationId(); + + SchemaProposeDiscoveryMessage proposeMsg = activeProposals.get(opId); + + if (proposeMsg != null) { + SchemaOperation op = schemaOps.get(proposeMsg.schemaKey()); + + if (op != null && F.eq(op.id(), opId) && op.started() && coordinator().isLocal()) { + if (log.isDebugEnabled()) + log.debug("Received status message [opId=" + msg.operationId() + + ", sndNodeId=" + msg.senderNodeId() + ']'); + + op.manager().onNodeFinished(msg.senderNodeId(), unmarshalSchemaError(msg.errorBytes())); + + return; + } + } + + // Put to pending set if operation is not visible/ready yet. + pendingMsgs.add(msg); + + if (log.isDebugEnabled()) + log.debug("Received status message (added to pending set) [opId=" + msg.operationId() + + ", sndNodeId=" + msg.senderNodeId() + ']'); + } + } + + /** + * Unwind pending messages for particular operation. + * + * @param opId Operation ID. + * @param mgr Manager. + */ + private void unwindPendingMessages(UUID opId, SchemaOperationManager mgr) { + assert Thread.holdsLock(stateMux); + + Iterator<SchemaOperationStatusMessage> it = pendingMsgs.iterator(); + + while (it.hasNext()) { + SchemaOperationStatusMessage msg = it.next(); + + if (F.eq(msg.operationId(), opId)) { + mgr.onNodeFinished(msg.senderNodeId(), unmarshalSchemaError(msg.errorBytes())); + + it.remove(); + } + } + } + + /** + * Marshal schema error. + * + * @param err Error. + * @return Error bytes. + */ + @Nullable private byte[] marshalSchemaError(UUID opId, @Nullable SchemaOperationException err) { + if (err == null) + return null; + + try { + return U.marshal(marsh, err); + } + catch (Exception e) { + U.warn(log, "Failed to marshal schema operation error [opId=" + opId + ", err=" + err + ']', e); + + try { + return U.marshal(marsh, new SchemaOperationException("Operation failed, but error cannot be " + + "serialized (see local node log for more details) [opId=" + opId + ", nodeId=" + + ctx.localNodeId() + ']')); + } + catch (Exception e0) { + assert false; // Impossible situation. + + return null; + } + } + } + + /** + * Unmarshal schema error. + * + * @param errBytes Error bytes. + * @return Error. + */ + @Nullable private SchemaOperationException unmarshalSchemaError(@Nullable byte[] errBytes) { + if (errBytes == null) + return null; + + try { + return U.unmarshal(marsh, errBytes, U.resolveClassLoader(ctx.config())); + } + catch (Exception e) { + return new SchemaOperationException("Operation failed, but error cannot be deserialized."); + } + } + + /** * @param ver Version. */ public static void setRequestAffinityTopologyVersion(AffinityTopologyVersion ver) { @@ -1010,4 +2339,160 @@ public class GridQueryProcessor extends GridProcessorAdapter { public static AffinityTopologyVersion getRequestAffinityTopologyVersion() { return requestTopVer.get(); } + + /** + * Schema operation. + */ + private class SchemaOperation { + /** Original propose msg. */ + private final SchemaProposeDiscoveryMessage proposeMsg; + + /** Next schema operation. */ + private SchemaOperation next; + + /** Operation manager. */ + private SchemaOperationManager mgr; + + /** Finish message. */ + private SchemaFinishDiscoveryMessage finishMsg; + + /** Finish guard. */ + private final AtomicBoolean finishGuard = new AtomicBoolean(); + + /** + * Constructor. + * + * @param proposeMsg Original propose message. + */ + public SchemaOperation(SchemaProposeDiscoveryMessage proposeMsg) { + this.proposeMsg = proposeMsg; + } + + /** + * @return Operation ID. + */ + public UUID id() { + return proposeMsg.operation().id(); + } + + /** + * @return Original propose message. + */ + public SchemaProposeDiscoveryMessage proposeMessage() { + return proposeMsg; + } + + /** + * @return Next schema operation. + */ + @Nullable public SchemaOperation next() { + return next; + } + + /** + * @param next Next schema operation. + */ + public void next(SchemaOperation next) { + this.next = next; + } + + /** + * @param finishMsg Finish message. + */ + public void finishMessage(SchemaFinishDiscoveryMessage finishMsg) { + this.finishMsg = finishMsg; + } + + /** + * @return {@code True} if finish request already received. + */ + public boolean hasFinishMessage() { + return finishMsg != null; + } + + /** + * Handle finish message. + */ + @SuppressWarnings("unchecked") + public void doFinish() { + assert started(); + + if (!finishGuard.compareAndSet(false, true)) + return; + + final UUID opId = id(); + final SchemaKey key = proposeMsg.schemaKey(); + + // Operation might be still in progress on client nodes which are not tracked by coordinator, + // so we chain to operation future instead of doing synchronous unwind. + mgr.worker().future().listen(new IgniteInClosure<IgniteInternalFuture>() { + @Override public void apply(IgniteInternalFuture fut) { + synchronized (stateMux) { + SchemaOperation op = schemaOps.remove(key); + + assert op != null; + assert F.eq(op.id(), opId); + + // Chain to the next operation (if any). + final SchemaOperation nextOp = op.next(); + + if (nextOp != null) { + schemaOps.put(key, nextOp); + + if (log.isDebugEnabled()) + log.debug("Next schema change operation started [opId=" + nextOp.id() + ']'); + + assert !nextOp.started(); + + // Cannot execute operation synchronously because it may cause starvation in exchange + // thread under load. Hence, moving short-lived operation to separate worker. + new IgniteThread(ctx.igniteInstanceName(), "schema-circuit-breaker-" + op.id(), + new Runnable() { + @Override public void run() { + onSchemaPropose(nextOp.proposeMessage()); + } + }).start(); + } + } + } + }); + } + + /** + * Unwind operation queue and get tail operation. + * + * @return Tail operation. + */ + public SchemaOperation unwind() { + if (next == null) + return this; + else + return next.unwind(); + } + + /** + * Whether operation started. + * + * @return {@code True} if started. + */ + public boolean started() { + return mgr != null; + } + + /** + * @return Operation manager. + */ + public SchemaOperationManager manager() { + return mgr; + } + + /** + * @param mgr Operation manager. + */ + public void manager(SchemaOperationManager mgr) { + assert this.mgr == null; + + this.mgr = mgr; + } + } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java index 44c41c1..b7434d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java @@ -81,6 +81,13 @@ public interface GridQueryTypeDescriptor { public Map<String, GridQueryIndexDescriptor> indexes(); /** + * Get text index for this type (if any). + * + * @return Text index or {@code null}. + */ + public GridQueryIndexDescriptor textIndex(); + + /** * Gets value class. * * @return Value class. http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java index b15007e..0666493 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java @@ -81,6 +81,13 @@ public class IgniteSQLException extends IgniteException { } /** + * @return Ignite SQL error code. + */ + public int statusCode() { + return statusCode; + } + + /** * @return JDBC exception containing details from this instance. */ public SQLException toJdbcException() { http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexDescriptorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexDescriptorImpl.java index 9d2d20c..1b85af5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexDescriptorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexDescriptorImpl.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.processors.query; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.QueryIndexType; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; @@ -45,22 +47,48 @@ public class QueryIndexDescriptorImpl implements GridQueryIndexDescriptor { /** Fields which should be indexed in descending order. */ private Collection<String> descendings; + /** Type descriptor. */ + @GridToStringExclude + private final QueryTypeDescriptorImpl typDesc; + + /** Index name. */ + private final String name; + /** */ private final QueryIndexType type; /** */ - private int inlineSize; + private final int inlineSize; /** + * Constructor. + * + * @param typDesc Type descriptor. + * @param name Index name. * @param type Type. + * @param inlineSize Inline size. */ - public QueryIndexDescriptorImpl(QueryIndexType type, int inlineSize) { + public QueryIndexDescriptorImpl(QueryTypeDescriptorImpl typDesc, String name, QueryIndexType type, int inlineSize) { assert type != null; + this.typDesc = typDesc; + this.name = name; this.type = type; this.inlineSize = inlineSize; } + /** + * @return Type descriptor. + */ + public QueryTypeDescriptorImpl typeDescriptor() { + return typDesc; + } + + /** {@inheritDoc} */ + @Override public String name() { + return name; + } + /** {@inheritDoc} */ @Override public Collection<String> fields() { Collection<String> res = new ArrayList<>(fields.size()); @@ -87,8 +115,14 @@ public class QueryIndexDescriptorImpl implements GridQueryIndexDescriptor { * @param field Field name. * @param orderNum Field order number in this index. * @param descending Sort order. + * @return This instance for chaining. + * @throws IgniteCheckedException If failed. */ - public void addField(String field, int orderNum, boolean descending) { + public QueryIndexDescriptorImpl addField(String field, int orderNum, boolean descending) + throws IgniteCheckedException { + if (!typDesc.hasField(field)) + throw new IgniteCheckedException("Field not found: " + field); + fields.add(new T2<>(field, orderNum)); if (descending) { @@ -97,6 +131,8 @@ public class QueryIndexDescriptorImpl implements GridQueryIndexDescriptor { descendings.add(field); } + + return this; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexKey.java new file mode 100644 index 0000000..f580111 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexKey.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.io.Serializable; + +/** + * Index key. + */ +public class QueryIndexKey implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Space. */ + private final String space; + + /** Name. */ + private final String name; + + /** + * Constructor. + * + * @param space Space. + * @param name Name. + */ + public QueryIndexKey(String space, String name) { + this.space = space; + this.name = name; + } + + /** + * @return Space. + */ + public String space() { + return space; + } + + /** + * @return Name. + */ + public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return 31 * (space != null ? space.hashCode() : 0) + (name != null ? name.hashCode() : 0); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryIndexKey other = (QueryIndexKey)o; + + return F.eq(name, other.name) && F.eq(space, other.space); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryIndexKey.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java new file mode 100644 index 0000000..395f077 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QuerySchema.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage; +import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; +import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexCreateOperation; +import org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexDropOperation; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +/** + * Dynamic cache schema. + */ +public class QuerySchema implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Query entities. */ + private final Collection<QueryEntity> entities = new LinkedList<>(); + + /** Mutex for state synchronization. */ + private final Object mux = new Object(); + + /** + * Default constructor. + */ + public QuerySchema() { + // No-op. + } + + /** + * Constructor. + * + * @param entities Query entities. + */ + public QuerySchema(Collection<QueryEntity> entities) { + assert entities != null; + + for (QueryEntity qryEntity : entities) + this.entities.add(new QueryEntity(qryEntity)); + } + + /** + * Copy object. + * + * @return Copy. + */ + public QuerySchema copy() { + synchronized (mux) { + QuerySchema res = new QuerySchema(); + + for (QueryEntity qryEntity : entities) + res.entities.add(new QueryEntity(qryEntity)); + + return res; + } + } + + /** + * Process finish message. + * + * @param msg Message. + */ + public void finish(SchemaFinishDiscoveryMessage msg) { + synchronized (mux) { + SchemaAbstractOperation op = msg.operation(); + + if (op instanceof SchemaIndexCreateOperation) { + SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op; + + for (QueryEntity entity : entities) { + String tblName = QueryUtils.tableName(entity); + + if (F.eq(tblName, op0.tableName())) { + boolean exists = false; + + for (QueryIndex idx : entity.getIndexes()) { + if (F.eq(idx.getName(), op0.indexName())) { + exists = true; + + break; + } + } + + if (!exists) { + List<QueryIndex> idxs = new ArrayList<>(entity.getIndexes()); + + idxs.add(op0.index()); + + entity.clearIndexes(); + entity.setIndexes(idxs); + } + + break; + } + } + } + else { + assert op instanceof SchemaIndexDropOperation; + + SchemaIndexDropOperation op0 = (SchemaIndexDropOperation)op; + + for (QueryEntity entity : entities) { + Collection<QueryIndex> idxs = entity.getIndexes(); + + QueryIndex victim = null; + + for (QueryIndex idx : idxs) { + if (F.eq(idx.getName(), op0.indexName())) { + victim = idx; + + break; + } + } + + if (victim != null) { + List<QueryIndex> newIdxs = new ArrayList<>(entity.getIndexes()); + + newIdxs.remove(victim); + + entity.clearIndexes(); + entity.setIndexes(idxs); + + break; + } + } + } + } + } + + /** + * @return Query entities. + */ + public Collection<QueryEntity> entities() { + synchronized (mux) { + return new ArrayList<>(entities); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QuerySchema.class, this); + } +}
