Cleaning up indexing module.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f39de03c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f39de03c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f39de03c Branch: refs/heads/ignite-4565-ddl Commit: f39de03c6d12ae01a77a162f83a6c95742cb8f27 Parents: b35568e Author: devozerov <[email protected]> Authored: Wed Mar 22 13:04:18 2017 +0300 Committer: devozerov <[email protected]> Committed: Wed Mar 22 13:04:18 2017 +0300 ---------------------------------------------------------------------- .../processors/query/h2/IgniteH2Indexing.java | 6 +- .../query/h2/ddl/DdlStatementsProcessor.java | 466 ++----------------- .../h2/ddl/msg/DdlInitDiscoveryMessage.java | 2 +- .../query/h2/ddl/GridDdlProtoTest.java | 188 -------- 4 files changed, 30 insertions(+), 632 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f39de03c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 07c0650..e61bfd6 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1498,7 +1498,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (DdlStatementsProcessor.isDdlStatement(prepared)) { try { - return ddlProc.runDdlStatement(stmt); + return ddlProc.runDdlStatement(cctx.name(), stmt); } catch (IgniteCheckedException e) { throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + sqlQry + ']', @@ -2200,10 +2200,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { log.debug("Stopping cache query index..."); // unregisterMBean(); TODO https://issues.apache.org/jira/browse/IGNITE-2139 - - if (ddlProc != null) - ddlProc.stop(); - for (Schema schema : schemas.values()) schema.onDrop(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f39de03c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java index 8bb831b..32e6da5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java @@ -17,50 +17,27 @@ package org.apache.ignite.internal.processors.query.h2.ddl; -import java.sql.PreparedStatement; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.query.QueryCursor; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.GridTopic; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; -import org.apache.ignite.internal.managers.communication.GridMessageListener; -import org.apache.ignite.internal.managers.discovery.CustomEventListener; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.query.IgniteSQLException; -import org.apache.ignite.internal.processors.query.ddl.DdlOperationNodeResult; -import org.apache.ignite.internal.processors.query.ddl.DdlOperationResult; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; -import org.apache.ignite.internal.processors.query.h2.ddl.msg.DdlAckDiscoveryMessage; -import org.apache.ignite.internal.processors.query.h2.ddl.msg.DdlInitDiscoveryMessage; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlCreateIndex; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDropIndex; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.thread.IgniteThread; import org.h2.command.Prepared; import org.h2.command.ddl.CreateIndex; import org.h2.command.ddl.DropIndex; import org.h2.jdbc.JdbcPreparedStatement; -import org.jetbrains.annotations.Nullable; + +import java.sql.PreparedStatement; +import java.util.Collections; +import java.util.List; import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.UPDATE_RESULT_META; @@ -75,15 +52,6 @@ public class DdlStatementsProcessor { /** Logger. */ private IgniteLogger log; - /** State flag. */ - private AtomicBoolean isStopped = new AtomicBoolean(); - - /** Running operations originating at this node as a client. */ - private Map<IgniteUuid, GridFutureAdapter> operations = new ConcurrentHashMap<>(); - - /** Worker. */ - private volatile DdlWorker worker; - /** * Initialize message handlers and this' fields needed for further operation. * @@ -94,378 +62,49 @@ public class DdlStatementsProcessor { this.ctx = ctx; log = ctx.log(DdlStatementsProcessor.class); - - worker = new DdlWorker(ctx.igniteInstanceName(), log); - - IgniteThread workerThread = new IgniteThread(worker); - - workerThread.setDaemon(true); - - workerThread.start(); - - ctx.discovery().setCustomEventListener(DdlInitDiscoveryMessage.class, - new CustomEventListener<DdlInitDiscoveryMessage>() { - /** {@inheritDoc} */ - @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"}) - @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, - DdlInitDiscoveryMessage msg) { - onInit(msg); - } - }); - - ctx.discovery().setCustomEventListener(DdlAckDiscoveryMessage.class, - new CustomEventListener<DdlAckDiscoveryMessage>() { - /** {@inheritDoc} */ - @Override public void onCustomEvent(AffinityTopologyVersion topVer, final ClusterNode snd, - final DdlAckDiscoveryMessage msg) { - submitTask(new DdlTask() { - @Override public void run() { - onAck(snd, msg); - } - }); - } - }); - - ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() { - /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { - if (msg instanceof DdlOperationResult) { - DdlOperationResult res = (DdlOperationResult) msg; - - onResult(res.getOperationId(), bytesToException(res.getError())); - } - - if (msg instanceof DdlOperationNodeResult) { - DdlOperationNodeResult res = (DdlOperationNodeResult) msg; - - onNodeResult(res.getOperationId(), bytesToException(res.getError())); - } - } - }); - } - - /** - * Submit a task to {@link #worker} for async execution. - * - * @param task Task. - */ - private void submitTask(DdlTask task) { - DdlWorker worker0 = worker; - - if (worker0 != null) - worker0.submit(task); - else - log.debug("Cannot submit DDL task because worker is null (node is stopping): " + task); - } - - /** - * Handle {@code ACK} message on a <b>peer node</b> - do local portion of actual DDL job and notify - * <b>coordinator</b> about success or failure. - * - * @param snd Sender. - * @param msg Message. - */ - @SuppressWarnings({"ThrowableInstanceNeverThrown", "unchecked"}) - private void onAck(ClusterNode snd, DdlAckDiscoveryMessage msg) { - // Don't do anything if we didn't choose to participate. - if (!msg.nodeIds().contains(ctx.localNodeId())) - return; - - IgniteCheckedException ex = null; - - DdlAbstractOperation args = msg.operation(); - - try { - doAck(args); - } - catch (Throwable e) { - ex = wrapThrowableIfNeeded(e); - } - - try { - DdlOperationNodeResult res = new DdlOperationNodeResult(); - - res.setOperationId(msg.operation().operationId()); - res.setError(exceptionToBytes(ex)); - - ctx.io().sendToGridTopic(snd, GridTopic.TOPIC_QUERY, res, GridIoPolicy.IDX_POOL); - } - catch (Throwable e) { - U.error(log, "Failed to notify coordinator about local DLL operation completion [opId=" + - msg.operation().operationId() + ", clientNodeId=" + snd.id() + ']', e); - } - } - - /** - * Perform local portion of DDL operation. - * Exists as a separate method to allow overriding it in tests to check behavior in case of errors. - * - * @param args Operation arguments. - * @throws IgniteCheckedException if failed. - */ - @SuppressWarnings("unchecked") - void doAck(DdlAbstractOperation args) throws IgniteCheckedException { - if (args instanceof DdlCreateIndexOperation) { - // No-op. - } - } - - /** - * Handle local DDL operation result from <b>a peer node</b> on <b>the coordinator</b>. - * - * @param opId DDL operation ID. - * @param err Exception that occurred on the <b>peer</b>, or null if the local operation has been successful. - */ - @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "SynchronizationOnLocalVariableOrMethodParameter", "ForLoopReplaceableByForEach"}) - private void onNodeResult(IgniteUuid opId, IgniteCheckedException err) { - // No-op. - } - - /** - * Process result of executing {@link DdlInitDiscoveryMessage} and react accordingly. - * Called from {@link DdlInitDiscoveryMessage#ackMessage()}. - * - * @param msg {@link DdlInitDiscoveryMessage} message. - * @return {@link DiscoveryCustomMessage} to return from {@link DdlInitDiscoveryMessage#ackMessage()}. - */ - @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "UnnecessaryInitCause"}) - public DiscoveryCustomMessage onInitFinished(DdlInitDiscoveryMessage msg) { - Map<UUID, IgniteCheckedException> nodesState = msg.nodeState(); - - assert nodesState != null; - - Map<UUID, IgniteCheckedException> errors = new HashMap<>(); - - for (Map.Entry<UUID, IgniteCheckedException> e : nodesState.entrySet()) - if (e.getValue() != null) - errors.put(e.getKey(), e.getValue()); - - if (!errors.isEmpty()) { - IgniteCheckedException resEx = new IgniteCheckedException("DDL operation has been cancelled at INIT stage"); - - if (errors.size() > 1) { - for (IgniteCheckedException e : errors.values()) - resEx.addSuppressed(e); - } - else - resEx.initCause(errors.values().iterator().next()); - - sendResult(msg.operation(), resEx); - - return null; - } - else - return new DdlAckDiscoveryMessage(msg.operation(), msg.nodeState().keySet()); - } - - /** - * Notify client about result. - * - * @param args Operation arguments. - * @param err Error, if any. - */ - private void sendResult(DdlAbstractOperation args, IgniteCheckedException err) { - assert args != null; - - DdlOperationResult res = new DdlOperationResult(); - - res.setError(exceptionToBytes(err)); - res.setOperationId(args.operationId()); - - try { - ctx.io().sendToGridTopic(args.clientNodeId(), GridTopic.TOPIC_QUERY, res, GridIoPolicy.IDX_POOL); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to notify client node about DDL operation failure " + - "[opId=" + args.operationId() + ", clientNodeId=" + args.clientNodeId() + ']', e); - } - } - - /** - * Callback handling whole DDL operation result <b>on the client</b>. - * - * @param opId DDL operation ID. - * @param err Error, if any. - */ - @SuppressWarnings("unchecked") - private void onResult(IgniteUuid opId, IgniteCheckedException err) { - GridFutureAdapter fut = operations.get(opId); - - if (fut == null) { - U.warn(log, "DDL operation not found at its client [opId=" + opId + ", nodeId=" + ctx.localNodeId() + ']'); - - return; - } - - fut.onDone(null, err); - } - - /** - * Perform preliminary actions and checks for {@code INIT} stage of DDL statement execution <b>on a peer node</b>. - * - * @param msg {@code INIT} message. - */ - @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) - private void onInit(DdlInitDiscoveryMessage msg) { - try { - // Let's tell everyone that we're participating if our init is successful... - if (doInit(msg.operation())) - msg.nodeState().put(ctx.localNodeId(), null); - } - catch (Throwable e) { - // Or tell everyone about the error that occurred - msg.nodeState().put(ctx.localNodeId(), wrapThrowableIfNeeded(e)); - } - } - - /** - * Perform actual INIT actions. - * Exists as a separate method to allow overriding it in tests to check behavior in case of errors. - * - * @param args Operation arguments. - * @throws IgniteCheckedException if failed. - */ - @SuppressWarnings("unchecked") - boolean doInit(DdlAbstractOperation args) throws IgniteCheckedException { - if (args instanceof DdlCreateIndexOperation) { - return true; - } - - return false; - } - - /** - * Optionally wrap a {@link Throwable} into an {@link IgniteCheckedException}. - * - * @param e Throwable to wrap. - * @return {@code e} if it's an {@link IgniteCheckedException} or an {@link IgniteCheckedException} wrapping it. - */ - private static IgniteCheckedException wrapThrowableIfNeeded(Throwable e) { - if (e instanceof IgniteCheckedException) - return (IgniteCheckedException) e; - else - return new IgniteCheckedException(e); - } - - /** - * Do cleanup. - */ - public void stop() throws IgniteCheckedException { - if (!isStopped.compareAndSet(false, true)) - throw new IgniteCheckedException(new IllegalStateException("DDL processor has been stopped already")); - - DdlWorker worker0 = worker; - - if (worker0 != null) { - worker0.cancel(); - - worker = null; - } - - for (Map.Entry<IgniteUuid, GridFutureAdapter> e : operations.entrySet()) - e.getValue().onDone(new IgniteCheckedException("Operation has been cancelled [opId=" + e.getKey() +']')); } /** * Execute DDL statement. * + * @param cacheName Cache name. * @param stmt H2 statement to parse and execute. */ @SuppressWarnings("unchecked") - public QueryCursor<List<?>> runDdlStatement(PreparedStatement stmt) + public QueryCursor<List<?>> runDdlStatement(String cacheName, PreparedStatement stmt) throws IgniteCheckedException { - if (isStopped.get()) - throw new IgniteCheckedException(new IllegalStateException("DDL processor has been stopped")); - assert stmt instanceof JdbcPreparedStatement; - GridSqlStatement gridStmt = new GridSqlQueryParser(false).parse(GridSqlQueryParser.prepared(stmt)); - - DdlAbstractOperation op; - - if (gridStmt instanceof GridSqlCreateIndex) { - GridSqlCreateIndex createIdx = (GridSqlCreateIndex) gridStmt; - - op = new DdlCreateIndexOperation(IgniteUuid.randomUuid(), ctx.localNodeId(), createIdx.index(), - createIdx.schemaName(), createIdx.tableName(), createIdx.ifNotExists()); - } - else if (gridStmt instanceof GridSqlDropIndex) - throw new UnsupportedOperationException("DROP INDEX"); - else - throw new IgniteSQLException("Unexpected DDL operation [type=" + gridStmt.getClass() + ']', - IgniteQueryErrorCode.UNEXPECTED_OPERATION); - - GridFutureAdapter opFut = new GridFutureAdapter(); - - operations.put(op.operationId(), opFut); + IgniteInternalFuture fut; try { - ctx.discovery().sendCustomEvent(new DdlInitDiscoveryMessage(op)); + GridSqlStatement gridStmt = new GridSqlQueryParser(false).parse(GridSqlQueryParser.prepared(stmt)); - opFut.get(); - } - finally { - operations.remove(op.operationId()); - } + if (gridStmt instanceof GridSqlCreateIndex) { + GridSqlCreateIndex createIdx = (GridSqlCreateIndex) gridStmt; - QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList - (Collections.singletonList(0L)), null, false); - - resCur.fieldsMeta(UPDATE_RESULT_META); - - return resCur; - } - - /** - * Serialize exception or at least its message to bytes. - * - * @param ex Exception. - * @return Serialized exception. - */ - private byte[] exceptionToBytes(IgniteCheckedException ex) { - if (ex == null) - return null; - - try { - return U.marshal(ctx, ex); - } - catch (IgniteCheckedException e) { - IgniteCheckedException resEx; - - // Let's try to serialize at least the message - try { - resEx = new IgniteCheckedException("Failed to serialize exception " + - "[msg=" + ex.getMessage() + ']'); - } - catch (Throwable ignored) { - resEx = new IgniteCheckedException("Failed to serialize exception"); + // TODO: How to handle schema name properly? + fut = ctx.cache().dynamicIndexCreate( + cacheName, createIdx.tableName(), createIdx.index(), createIdx.ifNotExists()); } + else if (gridStmt instanceof GridSqlDropIndex) + throw new UnsupportedOperationException("DROP INDEX"); + else + throw new IgniteSQLException("Unexpected DDL operation [type=" + gridStmt.getClass() + ']', + IgniteQueryErrorCode.UNEXPECTED_OPERATION); - try { - return U.marshal(ctx, resEx); - } - catch (IgniteCheckedException exx) { - // Why would it fail? We've sanitized it... - throw new AssertionError(exx); - } - } - } + fut.get(); - /** - * Deserialize exception from bytes. - * - * @param ex Exception. - * @return Serialized exception. - */ - private IgniteCheckedException bytesToException(byte[] ex) { - if (ex == null) - return null; + QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList + (Collections.singletonList(0L)), null, false); - try { - return U.unmarshal(ctx, ex, U.resolveClassLoader(ctx.config())); + resCur.fieldsMeta(UPDATE_RESULT_META); + + return resCur; } - catch (Throwable e) { - return new IgniteCheckedException("Failed to deserialize exception", e); + catch (Exception e) { + // TODO: Proper error handling. + throw new IgniteSQLException("DLL operation failed.", e); } } @@ -476,53 +115,4 @@ public class DdlStatementsProcessor { public static boolean isDdlStatement(Prepared cmd) { return cmd instanceof CreateIndex || cmd instanceof DropIndex; } - - /** - * DDL worker. - */ - private class DdlWorker extends GridWorker { - /** Worker queue. */ - private final BlockingQueue<DdlTask> queue = new LinkedBlockingDeque<>(); - - /** - * Constructor. - * - * @param gridName Gird name. - * @param log Logger. - */ - public DdlWorker(@Nullable String gridName, IgniteLogger log) { - super(gridName, "indexing-ddl-worker", log); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - while (!isCancelled()) { - DdlTask task = queue.take(); - - try { - task.run(); - } - catch (Exception e) { - U.error(log, "Unexpected exception during DDL task processing [task=" + task + ']', e); - } - catch (Throwable t) { - U.error(log, "Unexpected error during DDL task processing (worker will be stopped) [task=" + - task + ']', t); - - throw t; - } - } - } - - /** - * Submit task. - * - * @param task Task. - */ - public void submit(DdlTask task) { - assert task != null; - - queue.add(task); - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f39de03c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java index 753eb0c..4e00d54 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/msg/DdlInitDiscoveryMessage.java @@ -61,7 +61,7 @@ public class DdlInitDiscoveryMessage extends DdlAbstractDiscoveryMessage impleme /** {@inheritDoc} */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") @Nullable @Override public DiscoveryCustomMessage ackMessage() { - return ((IgniteH2Indexing)ctx.query().getIndexing()).getDdlStatementsProcessor().onInitFinished(this); + return null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/f39de03c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ddl/GridDdlProtoTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ddl/GridDdlProtoTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ddl/GridDdlProtoTest.java deleted file mode 100644 index d507dff..0000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ddl/GridDdlProtoTest.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2.ddl; - -import java.io.Serializable; -import java.util.concurrent.Callable; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.query.SqlFieldsQuery; -import org.apache.ignite.cache.query.annotations.QuerySqlField; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.internal.processors.query.IgniteSQLException; -import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; -import org.apache.ignite.internal.processors.query.h2.ddl.msg.DdlInitDiscoveryMessage; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -/** - * - */ -public class GridDdlProtoTest extends GridCommonAbstractTest { - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected void beforeTestsStarted() throws Exception { - IgniteH2Indexing.ddlProcCls = DdlProc.class; - - startGridsMultiThreaded(3, true); - - ignite(0).createCache(cacheConfig("S2P", true, false).setIndexedTypes(String.class, Person.class)); - - startGrid(getTestIgniteInstanceName(3), getConfiguration().setClientMode(true)); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - DdlProc.testName = null; - - ignite(0).cache("S2P").clear(); - - ignite(0).cache("S2P").put("FirstKey", new Person(1, "John", "White")); - ignite(0).cache("S2P").put("SecondKey", new Person(2, "Joe", "Black")); - ignite(0).cache("S2P").put("k3", new Person(3, "Sylvia", "Green")); - ignite(0).cache("S2P").put("f0u4thk3y", new Person(4, "Jane", "Silver")); - } - - /** Test behavior in case of INIT failure (cancel via {@link DdlInitDiscoveryMessage#ackMessage}). */ - public void testInitFailure() { - DdlProc.testName = GridTestUtils.getGridTestName(); - - assertCreateIndexThrowsWithMessage("DDL operation has been cancelled at INIT stage", false); - } - - /** - * Test error handling. - * - * @param msg Expected message. - * @param loc Run query locally on single node. - */ - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - private void assertCreateIndexThrowsWithMessage(String msg, final boolean loc) { - final Throwable e = GridTestUtils.assertThrows(null, new Callable<Object>() { - /** {@inheritDoc} */ - @Override public Object call() throws Exception { - ignite(3).cache("S2P").query(new SqlFieldsQuery("create index idx on Person(id desc)").setLocal(loc)); - return null; - } - }, IgniteSQLException.class, "Failed to execute DDL statement"); - - GridTestUtils.assertThrows(null, new Callable<Object>() { - @Override public Object call() throws Exception { - throw (Exception) e.getCause(); - } - }, IgniteCheckedException.class, msg); - } - - /** - * @param name Cache name. - * @param partitioned Partition or replicated cache. - * @param escapeSql whether identifiers should be quoted - see {@link CacheConfiguration#setSqlEscapeAll} - * @return Cache configuration. - */ - protected static CacheConfiguration cacheConfig(String name, boolean partitioned, boolean escapeSql) { - return new CacheConfiguration() - .setName(name) - .setCacheMode(partitioned ? CacheMode.PARTITIONED : CacheMode.REPLICATED) - .setAtomicityMode(CacheAtomicityMode.ATOMIC) - .setBackups(1) - .setSqlEscapeAll(escapeSql); - } - - /** - * - */ - static class Person implements Serializable { - /** */ - public Person(int id, String name, String secondName) { - this.id = id; - this.name = name; - this.secondName = secondName; - } - - /** */ - @QuerySqlField - protected int id; - - /** */ - @QuerySqlField(name = "firstName") - protected final String name; - - /** */ - @QuerySqlField - final String secondName; - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Person person = (Person) o; - - return id == person.id && name.equals(person.name) && secondName.equals(person.secondName); - - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = id; - res = 31 * res + name.hashCode(); - res = 31 * res + secondName.hashCode(); - return res; - } - } - - /** - * Custom implementation to test behavior on failure during various stages. - */ - public final static class DdlProc extends DdlStatementsProcessor { - /** Name of current test. */ - private static volatile String testName; - - /** {@inheritDoc} */ - @Override boolean doInit(DdlAbstractOperation args) { - // Let's throw an exception on a single node in the ring - if ("InitFailure".equals(testName) && ctx.igniteInstanceName().endsWith("2")) - throw new RuntimeException("Hello from DdlProc Init"); - else - try { - return super.doInit(args); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - /** {@inheritDoc} - * @param args*/ - @Override void doAck(DdlAbstractOperation args) { - if ("AckFailure".equals(testName) && ctx.igniteInstanceName().endsWith("1")) - throw new RuntimeException("Hello from DdlProc Ack"); - else - try { - super.doInit(args); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - } -}
