Repository: ignite Updated Branches: refs/heads/master 054b4eb9d -> 4a2a08b72
IGNITE-945: HTTP REST prepend/append commands failed. Reviewed and merged by Denis Magda ([email protected]) Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4a2a08b7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4a2a08b7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4a2a08b7 Branch: refs/heads/master Commit: 4a2a08b729e8939041de658efbeb885a513d10e6 Parents: 054b4eb Author: Saikat Maitra <[email protected]> Authored: Tue Jul 19 12:26:56 2016 +0300 Committer: Denis Magda <[email protected]> Committed: Tue Jul 19 12:26:56 2016 +0300 ---------------------------------------------------------------------- .../handlers/cache/GridCacheCommandHandler.java | 84 ++++++++++++-------- .../GridCacheAtomicCommandHandlerSelfTest.java | 39 +++++++++ .../cache/GridCacheCommandHandlerSelfTest.java | 19 ++++- .../testsuites/IgniteRestHandlerTestSuite.java | 2 + 4 files changed, 110 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4a2a08b7/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java index e8179d9..a8062ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java @@ -31,6 +31,11 @@ import java.util.UUID; import java.util.concurrent.Callable; import javax.cache.expiry.Duration; import javax.cache.expiry.ModifiedExpiryPolicy; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; +import javax.cache.processor.MutableEntry; + import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -55,6 +60,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheSqlMetadata; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.rest.GridRestCommand; import org.apache.ignite.internal.processors.rest.GridRestResponse; +import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandler; import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandlerAdapter; import org.apache.ignite.internal.processors.rest.request.GridRestCacheRequest; import org.apache.ignite.internal.processors.rest.request.GridRestRequest; @@ -100,8 +106,6 @@ import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_S import static org.apache.ignite.internal.processors.rest.GridRestCommand.DESTROY_CACHE; import static org.apache.ignite.internal.processors.rest.GridRestCommand.GET_OR_CREATE_CACHE; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER; -import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** * Command handler for API requests. @@ -195,7 +199,8 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { private static IgniteInternalFuture<?> appendOrPrepend( final GridKernalContext ctx, final IgniteInternalCache<Object, Object> cache, - final Object key, GridRestCacheRequest req, final boolean prepend) throws IgniteCheckedException { + final Object key, GridRestCacheRequest req, + final boolean prepend) throws IgniteCheckedException { assert cache != null; assert key != null; assert req != null; @@ -206,23 +211,38 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("val")); return ctx.closure().callLocalSafe(new Callable<Object>() { - @Override public Object call() throws Exception { - try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - Object curVal = cache.get(key); - - if (curVal == null) - return false; - - // Modify current value with appendix one. - Object newVal = appendOrPrepend(curVal, val, !prepend); - - // Put new value asynchronously. - cache.put(key, newVal); + @Override + public Object call() throws Exception { + EntryProcessorResult<Boolean> res = cache.invoke(key, new EntryProcessor<Object, Object, Boolean>() { + @Override + public Boolean process(MutableEntry<Object, Object> entry, + Object... objects) throws EntryProcessorException { + try { + Object curVal = entry.getValue(); + + if (curVal == null) + return false; + + // Modify current value with appendix one. + Object newVal = appendOrPrepend(curVal, val, !prepend); + + // Put new value asynchronously. + entry.setValue(newVal); + + return true; + } + catch (IgniteCheckedException e) { + throw new EntryProcessorException(e); + } + } + }); - tx.commit(); + try { + return res.get(); + } + catch (EntryProcessorException e) { + throw new IgniteCheckedException(e.getCause()); } - - return true; } }, false); } @@ -236,7 +256,8 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { * @return Resulting value. * @throws IgniteCheckedException In case of grid exceptions. */ - private static Object appendOrPrepend(Object origVal, Object appendVal, boolean appendPlc) throws IgniteCheckedException { + private static Object appendOrPrepend(Object origVal, Object appendVal, + boolean appendPlc) throws IgniteCheckedException { // Strings. if (appendVal instanceof String && origVal instanceof String) return appendPlc ? origVal + (String)appendVal : (String)appendVal + origVal; @@ -313,7 +334,8 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { * @return Instance on the named cache. * @throws IgniteCheckedException If cache not found. */ - private static IgniteInternalCache<Object, Object> cache(Ignite ignite, String cacheName) throws IgniteCheckedException { + private static IgniteInternalCache<Object, Object> cache(Ignite ignite, + String cacheName) throws IgniteCheckedException { IgniteInternalCache<Object, Object> cache = ((IgniteKernal)ignite).getCache(cacheName); if (cache == null) @@ -630,11 +652,10 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { } /** - * Executes command on flagged cache projection. Checks {@code destId} to find - * if command could be performed locally or routed to a remote node. + * Executes command on flagged cache projection. Checks {@code destId} to find if command could be performed locally + * or routed to a remote node. * - * @param destId Target node Id for the operation. - * If {@code null} - operation could be executed anywhere. + * @param destId Target node Id for the operation. If {@code null} - operation could be executed anywhere. * @param clientId Client ID. * @param cacheName Cache name. * @param skipStore Skip store. @@ -655,7 +676,7 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { destId == null || destId.equals(ctx.localNodeId()) || replicatedCacheAvailable(cacheName); if (locExec) { - IgniteInternalCache<?,?> prj = localCache(cacheName).forSubjectId(clientId).setSkipStore(skipStore); + IgniteInternalCache<?, ?> prj = localCache(cacheName).forSubjectId(clientId).setSkipStore(skipStore); return op.apply((IgniteInternalCache<Object, Object>)prj, ctx). chain(resultWrapper((IgniteInternalCache<Object, Object>)prj, key)); @@ -672,11 +693,10 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { } /** - * Executes command on cache. Checks {@code destId} to find - * if command could be performed locally or routed to a remote node. + * Executes command on cache. Checks {@code destId} to find if command could be performed locally or routed to a + * remote node. * - * @param destId Target node Id for the operation. - * If {@code null} - operation could be executed anywhere. + * @param destId Target node Id for the operation. If {@code null} - operation could be executed anywhere. * @param clientId Client ID. * @param cacheName Cache name. * @param key Key to set affinity mapping in the response. @@ -719,7 +739,7 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { * @return If replicated cache with given name is locally available. */ private boolean replicatedCacheAvailable(String cacheName) { - GridCacheAdapter<Object,Object> cache = ctx.cache().internalCache(cacheName); + GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); return cache != null && cache.configuration().getCacheMode() == CacheMode.REPLICATED; } @@ -1049,7 +1069,7 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { /** Key. */ protected final Object key; - /** Value.*/ + /** Value. */ protected final Object val; /** @@ -1496,7 +1516,7 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> applyx(IgniteInternalCache<Object, Object> c, GridKernalContext ctx) { - return c.sizeAsync(new CachePeekMode[]{CachePeekMode.PRIMARY}); + return c.sizeAsync(new CachePeekMode[] {CachePeekMode.PRIMARY}); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4a2a08b7/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheAtomicCommandHandlerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheAtomicCommandHandlerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheAtomicCommandHandlerSelfTest.java new file mode 100644 index 0000000..539b189 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheAtomicCommandHandlerSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.handlers.cache; + +import org.apache.ignite.cache.CacheAtomicityMode; + +/** + * Tests command handler directly for atomic cache. + */ +public class GridCacheAtomicCommandHandlerSelfTest extends GridCacheCommandHandlerSelfTest { + /** + * Constructor. + */ + public GridCacheAtomicCommandHandlerSelfTest() { + super(); + } + + /** + * @return CacheAtomicityMode for the cache. + */ + @Override protected CacheAtomicityMode atomicityMode(){ + return CacheAtomicityMode.ATOMIC; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4a2a08b7/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandlerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandlerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandlerSelfTest.java index 3d00626..eadf674 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandlerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandlerSelfTest.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.ConnectorConfiguration; @@ -46,6 +47,8 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import javax.cache.processor.EntryProcessorException; + /** * Tests command handler directly. */ @@ -70,6 +73,8 @@ public class GridCacheCommandHandlerSelfTest extends GridCommonAbstractTest { cacheCfg.setCacheMode(CacheMode.LOCAL); + cacheCfg.setAtomicityMode(atomicityMode()); + // Grid config. IgniteConfiguration cfg = super.getConfiguration(); @@ -86,6 +91,14 @@ public class GridCacheCommandHandlerSelfTest extends GridCommonAbstractTest { } /** + * + * @return CacheAtomicityMode for the cache. + */ + protected CacheAtomicityMode atomicityMode(){ + return CacheAtomicityMode.TRANSACTIONAL; + } + + /** * Tests the cache failure during the execution of the CACHE_GET command. * * @throws Exception If failed. @@ -150,6 +163,8 @@ public class GridCacheCommandHandlerSelfTest extends GridCommonAbstractTest { catch (IgniteCheckedException e) { info("Got expected exception: " + e); + e.printStackTrace(); + assertTrue(e.getMessage().startsWith("Incompatible types")); } } @@ -164,7 +179,7 @@ public class GridCacheCommandHandlerSelfTest extends GridCommonAbstractTest { * @return Resulting value in cache. * @throws IgniteCheckedException In case of any grid exception. */ - private <T> T testAppend(T curVal, T newVal, boolean append) throws IgniteCheckedException { + private <T> T testAppend(T curVal, T newVal, boolean append) throws IgniteCheckedException, EntryProcessorException { GridRestCommandHandler hnd = new GridCacheCommandHandler(((IgniteKernal)grid()).context()); String key = UUID.randomUUID().toString(); @@ -185,7 +200,7 @@ public class GridCacheCommandHandlerSelfTest extends GridCommonAbstractTest { jcache().put(key, curVal); // Validate behavior for initialized cache (has current value). - assertTrue("Expects succeed.", (Boolean)hnd.handleAsync(req).get().getResponse()); + assertTrue((Boolean) hnd.handleAsync(req).get().getResponse()); } finally { res = (T)jcache().getAndRemove(key); http://git-wip-us.apache.org/repos/asf/ignite/blob/4a2a08b7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java index d5de376..42c6752 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteRestHandlerTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.rest.handlers.cache.GridCacheAtomicCommandHandlerSelfTest; import org.apache.ignite.internal.processors.rest.handlers.cache.GridCacheCommandHandlerSelfTest; import org.apache.ignite.internal.processors.rest.handlers.log.GridLogCommandHandlerTest; @@ -33,6 +34,7 @@ public class IgniteRestHandlerTestSuite extends TestSuite { TestSuite suite = new TestSuite("REST Support Test Suite"); suite.addTestSuite(GridCacheCommandHandlerSelfTest.class); + suite.addTestSuite(GridCacheAtomicCommandHandlerSelfTest.class); suite.addTestSuite(GridLogCommandHandlerTest.class); return suite;
