ignite-2542 'Dummy' merge index should not try to access h2 table.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9c4f8110 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9c4f8110 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9c4f8110 Branch: refs/heads/ignite-1786 Commit: 9c4f8110eb8bc76ce4a28ffb522263339a71606c Parents: 46b6a76 Author: sboikov <[email protected]> Authored: Mon Feb 15 11:00:44 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon Feb 15 11:00:44 2016 +0300 ---------------------------------------------------------------------- .../query/h2/twostep/GridMergeIndex.java | 32 +++- .../h2/twostep/GridMergeIndexUnsorted.java | 19 ++- .../query/h2/twostep/GridMergeTable.java | 16 +- .../h2/twostep/GridReduceQueryExecutor.java | 2 +- .../near/IgniteCacheQueryNodeFailTest.java | 148 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 2 + 6 files changed, 191 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9c4f8110/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java index 12c2240..3914bd7 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.cache.CacheException; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridKernalContext; import org.h2.engine.Constants; import org.h2.engine.Session; import org.h2.index.BaseIndex; @@ -69,21 +70,31 @@ public abstract class GridMergeIndex extends BaseIndex { /** */ private int fetchedCnt; + /** */ + private final GridKernalContext ctx; + /** + * @param ctx Context. * @param tbl Table. * @param name Index name. * @param type Type. * @param cols Columns. */ - public GridMergeIndex(GridMergeTable tbl, String name, IndexType type, IndexColumn[] cols) { + public GridMergeIndex(GridKernalContext ctx, + GridMergeTable tbl, + String name, + IndexType type, + IndexColumn[] cols) { + this.ctx = ctx; + initBaseIndex(tbl, 0, name, cols, type); } /** - * + * @param ctx Context. */ - protected GridMergeIndex() { - // No-op. + protected GridMergeIndex(GridKernalContext ctx) { + this.ctx = ctx; } /** @@ -94,6 +105,19 @@ public abstract class GridMergeIndex extends BaseIndex { } /** + * Fails index if any source node is left. + */ + protected final void checkSourceNodesAlive() { + for (UUID nodeId : sources()) { + if (!ctx.discovery().alive(nodeId)) { + fail(nodeId); + + return; + } + } + } + + /** * @param nodeId Node ID. * @return {@code true} If this index needs data from the given source node. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9c4f8110/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java index 501480a..5639340 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java @@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import javax.cache.CacheException; +import org.apache.ignite.internal.GridKernalContext; import org.h2.index.Cursor; import org.h2.index.IndexType; import org.h2.result.Row; @@ -40,25 +41,27 @@ public class GridMergeIndexUnsorted extends GridMergeIndex { private final BlockingQueue<GridResultPage> queue = new LinkedBlockingQueue<>(); /** + * @param ctx Context. * @param tbl Table. * @param name Index name. */ - public GridMergeIndexUnsorted(GridMergeTable tbl, String name) { - super(tbl, name, IndexType.createScan(false), IndexColumn.wrap(tbl.getColumns())); + public GridMergeIndexUnsorted(GridKernalContext ctx, GridMergeTable tbl, String name) { + super(ctx, tbl, name, IndexType.createScan(false), IndexColumn.wrap(tbl.getColumns())); } /** + * @param ctx Context. * @return Dummy index instance. */ - public static GridMergeIndexUnsorted createDummy() { - return new GridMergeIndexUnsorted(); + public static GridMergeIndexUnsorted createDummy(GridKernalContext ctx) { + return new GridMergeIndexUnsorted(ctx); } /** - * + * @param ctx Context. */ - private GridMergeIndexUnsorted() { - // No-op. + private GridMergeIndexUnsorted(GridKernalContext ctx) { + super(ctx); } /** {@inheritDoc} */ @@ -94,7 +97,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex { if (page != null) break; - ((GridMergeTable)table).checkSourceNodesAlive(); + checkSourceNodesAlive(); } if (page.isLast()) http://git-wip-us.apache.org/repos/asf/ignite/blob/9c4f8110/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java index adfd220..a86cbcd 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.query.h2.twostep; import java.util.ArrayList; -import java.util.UUID; import org.apache.ignite.internal.GridKernalContext; import org.h2.command.ddl.CreateTableData; import org.h2.engine.Session; @@ -47,20 +46,7 @@ public class GridMergeTable extends TableBase { super(data); this.ctx = ctx; - idx = new GridMergeIndexUnsorted(this, "merge_scan"); - } - - /** - * Fails merge table if any source node is left. - */ - public void checkSourceNodesAlive() { - for (UUID nodeId : idx.sources()) { - if (!ctx.discovery().alive(nodeId)) { - idx.fail(nodeId); - - return; - } - } + idx = new GridMergeIndexUnsorted(ctx, this, "merge_scan"); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9c4f8110/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 828d9bd..47ab083 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -532,7 +532,7 @@ public class GridReduceQueryExecutor { fakeTable(r.conn, tblIdx++).setInnerTable(tbl); } else - idx = GridMergeIndexUnsorted.createDummy(); + idx = GridMergeIndexUnsorted.createDummy(ctx); for (ClusterNode node : nodes) idx.addSource(node.id()); http://git-wip-us.apache.org/repos/asf/ignite/blob/9c4f8110/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeFailTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeFailTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeFailTest.java new file mode 100644 index 0000000..b601954 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeFailTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Test added to check for https://issues.apache.org/jira/browse/IGNITE-2542. + */ +public class IgniteCacheQueryNodeFailTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + ccfg.setBackups(0); + ccfg.setIndexedTypes(Integer.class, Integer.class); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrid(0); + + client = true; + + startGrid(1); + + client = false; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testNodeFailedSimpleQuery()throws Exception { + checkNodeFailed("select _key from Integer"); + } + + /** + * @throws Exception If failed. + */ + public void testNodeFailedReduceQuery()throws Exception { + checkNodeFailed("select avg(_key) from Integer"); + } + + /** + * @param qry Query. + * @throws Exception If failed. + */ + private void checkNodeFailed(final String qry) throws Exception { + Ignite failSrv = startGrid(2); + + awaitPartitionMapExchange(); + + assertFalse(failSrv.configuration().isClientMode()); + + Ignite client = grid(1); + + final IgniteCache<Integer, Integer> cache = client.cache(null); + + for (int i = 0; i < 100_000; i++) + cache.put(i, i); + + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + SqlFieldsQuery fieldsQry = new SqlFieldsQuery(qry); + + while (!stop.get()) { + try { + cache.query(fieldsQry).getAll(); + } + catch (CacheException e) { + Throwable cause = e.getCause(); + + assertFalse("Unexpected cause: " + cause, + cause instanceof NullPointerException); + } + } + + return null; + } + }, 20, "qry-thread"); + + try { + failSrv.close(); + + U.sleep(100); + } + finally { + stop.set(true); + } + + fut.get(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9c4f8110/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index f11c2f8..84a9a45 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheP import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedQueryP2PDisabledSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedQuerySelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedSnapshotEnabledQuerySelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeFailTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest2; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest; @@ -159,6 +160,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheOffheapTieredMultithreadedSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); + suite.addTestSuite(IgniteCacheQueryNodeFailTest.class); suite.addTestSuite(IgniteCacheClientQueryReplicatedNodeRestartSelfTest.class); suite.addTestSuite(GridCacheReduceQueryMultithreadedSelfTest.class); suite.addTestSuite(GridCacheCrossCacheQuerySelfTest.class);
