IGNITE-9442 Collocated IgniteSet#close is not working on non-affinity node.
Signed-off-by: Anton Vinogradov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/507aeb21 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/507aeb21 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/507aeb21 Branch: refs/heads/ignite-9720 Commit: 507aeb21bf6792bb9efb6518f83049ebc7eab53f Parents: 6fd6c32 Author: pereslegin-pa <[email protected]> Authored: Mon Nov 26 18:53:10 2018 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Mon Nov 26 18:53:10 2018 +0300 ---------------------------------------------------------------------- .../CacheDataStructuresManager.java | 15 +++--- .../datastructures/GridCacheSetImpl.java | 4 +- .../GridCacheSetAbstractSelfTest.java | 31 ++++++++++-- ...idCachePartitionedSetWithClientSelfTest.java | 52 ++++++++++++++++++++ ...chePartitionedSetWithNodeFilterSelfTest.java | 37 ++++++++++++++ ...ridCacheReplicatedSetWithClientSelfTest.java | 52 ++++++++++++++++++++ ...acheReplicatedSetWithNodeFilterSelfTest.java | 37 ++++++++++++++ .../IgniteCacheDataStructuresSelfTestSuite.java | 8 +++ 8 files changed, 224 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/507aeb21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index 932f000..9c3c4a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -63,6 +63,7 @@ import org.apache.ignite.internal.processors.datastructures.GridTransactionalCac import org.apache.ignite.internal.processors.datastructures.SetItemKey; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; @@ -472,7 +473,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { while (true) { AffinityTopologyVersion topVer = cctx.topologyVersionFuture().get(); - Collection<ClusterNode> nodes = CU.affinityNodes(cctx, topVer); + Collection<ClusterNode> nodes = F.view(cctx.discovery().nodes(topVer), node -> !node.isDaemon()); try { cctx.closures().callAsyncNoFailover(BROADCAST, @@ -502,10 +503,12 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { throw e; } + Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer); + try { cctx.closures().callAsyncNoFailover(BROADCAST, new RemoveSetDataCallable(cctx.name(), id, topVer), - nodes, + affNodes, true, 0, false).get(); } @@ -516,7 +519,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { continue; } - else if (!pingNodes(nodes)) { + else if (!pingNodes(affNodes)) { if (log.isDebugEnabled()) log.debug("RemoveSetData job failed and set data node left, will retry: " + e); @@ -650,9 +653,9 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { GridCacheAdapter cache = ((IgniteKernal)ignite).context().cache().internalCache(cacheName); - assert cache != null : cacheName; - - cache.context().dataStructures().blockSet(setId); + // On non-affinity node cache starts on demand, so it may not be running. + if (cache != null) + cache.context().dataStructures().blockSet(setId); return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/507aeb21/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index 8aae9d0..97d34f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -514,7 +514,9 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite * @throws IgniteCheckedException If all cache nodes left grid. */ private Collection<ClusterNode> dataNodes(AffinityTopologyVersion topVer) throws IgniteCheckedException { - if (ctx.isLocal() || ctx.isReplicated()) + assert ctx.isPartitioned() || collocated : "Non-collocated mode is supported only for PARTITIONED caches."; + + if (ctx.isLocal() || (ctx.isReplicated() && ctx.affinityNode())) return Collections.singleton(ctx.localNode()); Collection<ClusterNode> nodes; http://git-wip-us.apache.org/repos/asf/ignite/blob/507aeb21/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java index fef55b8..4e888aa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java @@ -32,6 +32,7 @@ import junit.framework.AssertionFailedError; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCluster; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSet; import org.apache.ignite.cache.CacheMode; @@ -181,13 +182,22 @@ public abstract class GridCacheSetAbstractSelfTest extends IgniteCollectionAbstr * @param collocated Collocation flag. * @throws Exception If failed. */ - private void testCreateRemove(boolean collocated) throws Exception { + protected void testCreateRemove(boolean collocated) throws Exception { + testCreateRemove(collocated, 0); + } + + /** + * @param collocated Collocation flag. + * @param nodeIdx Index of the node from which to create set. + * @throws Exception If failed. + */ + protected void testCreateRemove(boolean collocated, int nodeIdx) throws Exception { for (int i = 0; i < gridCount(); i++) assertNull(grid(i).set(SET_NAME, null)); CollectionConfiguration colCfg0 = config(collocated); - IgniteSet<Integer> set0 = grid(0).set(SET_NAME, colCfg0); + IgniteSet<Integer> set0 = grid(nodeIdx).set(SET_NAME, colCfg0); assertNotNull(set0); @@ -411,10 +421,19 @@ public abstract class GridCacheSetAbstractSelfTest extends IgniteCollectionAbstr * @param collocated Collocation flag. * @throws Exception If failed. */ - private void testIterator(boolean collocated) throws Exception { + protected void testIterator(boolean collocated) throws Exception { + testIterator(collocated, 0); + } + + /** + * @param collocated Collocation flag. + * @param nodeIdx Index of the node from which to create set. + * @throws Exception If failed. + */ + protected void testIterator(boolean collocated, int nodeIdx) throws Exception { CollectionConfiguration colCfg = config(collocated); - final IgniteSet<Integer> set0 = grid(0).set(SET_NAME, colCfg); + final IgniteSet<Integer> set0 = grid(nodeIdx).set(SET_NAME, colCfg); for (int i = 0; i < gridCount(); i++) { IgniteSet<Integer> set = grid(i).set(SET_NAME, null); @@ -839,7 +858,9 @@ public abstract class GridCacheSetAbstractSelfTest extends IgniteCollectionAbstr for (int i = 0; i < 10; i++) set.add(i); - Collection<Integer> c = grid(0).compute().broadcast(new IgniteCallable<Integer>() { + IgniteCluster cluster = grid(0).cluster(); + + Collection<Integer> c = grid(0).compute(cluster).broadcast(new IgniteCallable<Integer>() { @Override public Integer call() throws Exception { assertEquals(SET_NAME, set.name()); http://git-wip-us.apache.org/repos/asf/ignite/blob/507aeb21/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSetWithClientSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSetWithClientSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSetWithClientSelfTest.java new file mode 100644 index 0000000..97be50d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSetWithClientSelfTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures.partitioned; + +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * Tests IgniteSet with client node on {@code PARTITIONED} cache. + */ +public class GridCachePartitionedSetWithClientSelfTest extends GridCachePartitionedSetSelfTest { + /** */ + private static final int CLIENT_NODE_IDX = 4; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setClientMode(getTestIgniteInstanceIndex(igniteInstanceName) == CLIENT_NODE_IDX); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void testCreateRemove(boolean collocated) throws Exception { + testCreateRemove(collocated, CLIENT_NODE_IDX); + } + + /** {@inheritDoc} */ + @Override protected void testIterator(boolean collocated) throws Exception { + testIterator(collocated, CLIENT_NODE_IDX); + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 5; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/507aeb21/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSetWithNodeFilterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSetWithNodeFilterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSetWithNodeFilterSelfTest.java new file mode 100644 index 0000000..116f383 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedSetWithNodeFilterSelfTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures.partitioned; + +import org.apache.ignite.configuration.CollectionConfiguration; +import org.apache.ignite.util.AttributeNodeFilter; + +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; + +/** + * Tests IgniteSet with node filter on {@code PARTITIONED} cache. + */ +public class GridCachePartitionedSetWithNodeFilterSelfTest extends GridCachePartitionedSetSelfTest { + /** {@inheritDoc} */ + @Override protected CollectionConfiguration collectionConfiguration() { + CollectionConfiguration cfg = super.collectionConfiguration(); + + cfg.setNodeFilter(new AttributeNodeFilter(ATTR_IGNITE_INSTANCE_NAME, getTestIgniteInstanceName(0))); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/507aeb21/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedSetWithClientSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedSetWithClientSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedSetWithClientSelfTest.java new file mode 100644 index 0000000..0c975ac --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedSetWithClientSelfTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures.replicated; + +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * Tests IgniteSet with client node on {@code REPLICATED} cache. + */ +public class GridCacheReplicatedSetWithClientSelfTest extends GridCacheReplicatedSetSelfTest { + /** */ + private static final int CLIENT_NODE_IDX = 4; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setClientMode(getTestIgniteInstanceIndex(igniteInstanceName) == CLIENT_NODE_IDX); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void testCreateRemove(boolean collocated) throws Exception { + testCreateRemove(collocated, CLIENT_NODE_IDX); + } + + /** {@inheritDoc} */ + @Override protected void testIterator(boolean collocated) throws Exception { + testIterator(collocated, CLIENT_NODE_IDX); + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 5; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/507aeb21/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedSetWithNodeFilterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedSetWithNodeFilterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedSetWithNodeFilterSelfTest.java new file mode 100644 index 0000000..1db790b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedSetWithNodeFilterSelfTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures.replicated; + +import org.apache.ignite.configuration.CollectionConfiguration; +import org.apache.ignite.util.AttributeNodeFilter; + +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; + +/** + * Tests IgniteSet with node filter on {@code REPLICATED} cache. + */ +public class GridCacheReplicatedSetWithNodeFilterSelfTest extends GridCacheReplicatedSetSelfTest { + /** {@inheritDoc} */ + @Override protected CollectionConfiguration collectionConfiguration() { + CollectionConfiguration cfg = super.collectionConfiguration(); + + cfg.setNodeFilter(new AttributeNodeFilter(ATTR_IGNITE_INSTANCE_NAME, getTestIgniteInstanceName(0))); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/507aeb21/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java index db74c72..bca706e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java @@ -62,6 +62,8 @@ import org.apache.ignite.internal.processors.cache.datastructures.partitioned.Gr import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedSequenceMultiNodeSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedSetFailoverSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedSetSelfTest; +import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedSetWithClientSelfTest; +import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedSetWithNodeFilterSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.partitioned.IgnitePartitionedAtomicLongApiSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.partitioned.IgnitePartitionedCountDownLatchSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.partitioned.IgnitePartitionedQueueNoBackupsTest; @@ -76,6 +78,8 @@ import org.apache.ignite.internal.processors.cache.datastructures.replicated.Gri import org.apache.ignite.internal.processors.cache.datastructures.replicated.GridCacheReplicatedSequenceApiSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.replicated.GridCacheReplicatedSequenceMultiNodeSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.replicated.GridCacheReplicatedSetSelfTest; +import org.apache.ignite.internal.processors.cache.datastructures.replicated.GridCacheReplicatedSetWithClientSelfTest; +import org.apache.ignite.internal.processors.cache.datastructures.replicated.GridCacheReplicatedSetWithNodeFilterSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.replicated.IgniteReplicatedAtomicLongApiSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.replicated.IgniteReplicatedCountDownLatchSelfTest; import org.apache.ignite.internal.processors.cache.datastructures.replicated.IgniteReplicatedLockSelfTest; @@ -112,6 +116,8 @@ public class IgniteCacheDataStructuresSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(GridCacheReplicatedQueueMultiNodeSelfTest.class)); suite.addTest(new TestSuite(GridCacheReplicatedQueueRotativeMultiNodeTest.class)); suite.addTest(new TestSuite(GridCacheReplicatedSetSelfTest.class)); + suite.addTest(new TestSuite(GridCacheReplicatedSetWithClientSelfTest.class)); + suite.addTest(new TestSuite(GridCacheReplicatedSetWithNodeFilterSelfTest.class)); suite.addTest(new TestSuite(GridCacheReplicatedDataStructuresFailoverSelfTest.class)); suite.addTest(new TestSuite(IgniteReplicatedCountDownLatchSelfTest.class)); suite.addTest(new TestSuite(IgniteReplicatedSemaphoreSelfTest.class)); @@ -129,6 +135,8 @@ public class IgniteCacheDataStructuresSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(GridCachePartitionedQueueCreateMultiNodeSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedAtomicQueueCreateMultiNodeSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedSetSelfTest.class)); + suite.addTest(new TestSuite(GridCachePartitionedSetWithClientSelfTest.class)); + suite.addTest(new TestSuite(GridCachePartitionedSetWithNodeFilterSelfTest.class)); suite.addTest(new TestSuite(IgnitePartitionedSetNoBackupsSelfTest.class)); suite.addTest(new TestSuite(GridCachePartitionedAtomicSetSelfTest.class)); suite.addTest(new TestSuite(IgnitePartitionedCountDownLatchSelfTest.class));
