IGNITE-6219 - IgniteCache#loadCache executes local load in caller thread
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e63f596 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e63f596 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e63f596 Branch: refs/heads/ignite-5896 Commit: 0e63f59686178d7ac025babd62580c8386dbd8f3 Parents: 788adc0 Author: dkarachentsev <[email protected]> Authored: Wed Sep 6 11:54:19 2017 +0300 Committer: Tikhonov Nikolay <[email protected]> Committed: Wed Sep 6 11:54:19 2017 +0300 ---------------------------------------------------------------------- .../processors/task/GridTaskWorker.java | 22 +++- .../cache/store/GridStoreLoadCacheTest.java | 120 +++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 5 +- 3 files changed, 145 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0e63f596/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 74fe57d..b94a427 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -574,7 +574,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { if (F.isEmpty(jobs)) return; - Collection<GridJobResultImpl> jobResList = new ArrayList<>(jobs.size()); + List<GridJobResultImpl> jobResList = new ArrayList<>(jobs.size()); Collection<ComputeJobSibling> sibs = new ArrayList<>(jobs.size()); @@ -632,6 +632,26 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { // Set mapped flag. ses.onMapped(); + // Move local jobs to the end of the list, because + // they will be invoked in current thread that will hold other + // jobs. + int jobResSize = jobResList.size(); + + if (jobResSize > 1) { + UUID locId = ctx.discovery().localNode().id(); + + for (int i = 0; i < jobResSize; i++) { + UUID jobNodeId = jobResList.get(i).getNode().id(); + + if (jobNodeId.equals(locId) && i < jobResSize - 1) { + Collections.swap(jobResList, i, jobResSize - 1); + + jobResSize--; + i--; + } + } + } + // Send out all remote mappedJobs. for (GridJobResultImpl res : jobResList) { evtLsnr.onJobSend(this, res.getSibling()); http://git-wip-us.apache.org/repos/asf/ignite/blob/0e63f596/modules/core/src/test/java/org/apache/ignite/cache/store/GridStoreLoadCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/GridStoreLoadCacheTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/GridStoreLoadCacheTest.java new file mode 100644 index 0000000..d88c431 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/GridStoreLoadCacheTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Test checks that local cacheLoad task never blocks remote + * cacheLoad. + */ +public class GridStoreLoadCacheTest extends GridCommonAbstractTest { + /** Barrier. */ + private static final CyclicBarrier BARRIER = new CyclicBarrier(3); + + /** Cache name. */ + public static final String CACHE_NAME = "test"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + //noinspection unchecked + cfg.setCacheConfiguration(new CacheConfiguration(CACHE_NAME).setCacheStoreFactory(new TestFactory())); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void test() throws Exception { + for (int i = 0; i < 3; i++) { + IgniteEx srv1 = startGrid(0); + startGrid(1); + startGrid(2); + + awaitPartitionMapExchange(); + + srv1.cache(CACHE_NAME).loadCache(null); + + stopAllGrids(); + } + } + + /** + * + */ + private static class TestFactory implements Factory<CacheStore> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new TestStore(); + } + } + + /** + * + */ + private static class TestStore extends CacheStoreAdapter<Object, Object> { + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) { + try { + BARRIER.await(3, TimeUnit.SECONDS); + } + catch (InterruptedException | BrokenBarrierException | TimeoutException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public Object load(Object key) throws CacheLoaderException { + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException { + // No-op + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + // No-op + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0e63f596/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index dea0eb0..e8810bb 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -22,6 +22,7 @@ import junit.framework.TestSuite; import org.apache.ignite.cache.IgniteWarmupClosureSelfTest; import org.apache.ignite.cache.store.GridCacheBalancingStoreSelfTest; import org.apache.ignite.cache.store.GridCacheLoadOnlyStoreAdapterSelfTest; +import org.apache.ignite.cache.store.GridStoreLoadCacheTest; import org.apache.ignite.cache.store.StoreResourceInjectionSelfTest; import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreBinaryMarshallerSelfTest; import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreBinaryMarshallerStoreKeepBinarySelfTest; @@ -82,8 +83,8 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicInvokeTest; import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicLocalInvokeTest; import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicLocalWithStoreInvokeTest; import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicNearEnabledInvokeTest; -import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicWithStoreInvokeTest; import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicStopBusySelfTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicWithStoreInvokeTest; import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerAtomicLocalTest; import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerAtomicReplicatedTest; import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerAtomicTest; @@ -314,6 +315,8 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(IgniteIncompleteCacheObjectSelfTest.class); + suite.addTestSuite(GridStoreLoadCacheTest.class); + return suite; } }
