ignite-7239 In case of not serializable cache update response, future on node requester will never complete
Signed-off-by: Andrey Gura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/65e1c577 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/65e1c577 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/65e1c577 Branch: refs/heads/ignite-zk Commit: 65e1c577906d41e85aa80cba6271e693aba32d3c Parents: 11508d9 Author: dkarachentsev <[email protected]> Authored: Thu Jan 11 16:36:39 2018 +0300 Committer: Andrey Gura <[email protected]> Committed: Thu Jan 11 16:37:12 2018 +0300 ---------------------------------------------------------------------- .../cache/CacheInvokeDirectResult.java | 16 +- .../IgniteCacheFailedUpdateResponseTest.java | 310 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite4.java | 2 + 3 files changed, 326 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/65e1c577/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java index 05b3c86..17f304e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java @@ -105,8 +105,20 @@ public class CacheInvokeDirectResult implements Message { public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { key.prepareMarshal(ctx.cacheObjectContext()); - if (err != null && errBytes == null) - errBytes = U.marshal(ctx.marshaller(), err); + if (err != null && errBytes == null) { + try { + errBytes = U.marshal(ctx.marshaller(), err); + } + catch (IgniteCheckedException e) { + // Try send exception even if it's unable to marshal. + IgniteCheckedException exc = new IgniteCheckedException(err.getMessage()); + + exc.setStackTrace(err.getStackTrace()); + exc.addSuppressed(e); + + errBytes = U.marshal(ctx.marshaller(), exc); + } + } if (res != null) res.prepareMarshal(ctx.cacheObjectContext()); http://git-wip-us.apache.org/repos/asf/ignite/blob/65e1c577/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheFailedUpdateResponseTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheFailedUpdateResponseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheFailedUpdateResponseTest.java new file mode 100644 index 0000000..ebcff7c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheFailedUpdateResponseTest.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.NotSerializableException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Callable; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryReader; +import org.apache.ignite.binary.BinaryWriter; +import org.apache.ignite.binary.Binarylizable; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CachePartialUpdateException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.testframework.GridTestUtils.assertThrows; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * Checks that no future hangs on non-srializable exceptions and values. + */ +public class IgniteCacheFailedUpdateResponseTest extends GridCommonAbstractTest { + /** Atomic cache. */ + private static final String ATOMIC_CACHE = "atomic"; + + /** Tx cache. */ + private static final String TX_CACHE = "tx"; + + /** Atomic cache. */ + private IgniteCache<Object, Object> atomicCache; + + /** Tx cache. */ + private IgniteCache<Object, Object> txCache; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + CacheConfiguration atomicCfg = new CacheConfiguration(ATOMIC_CACHE); + CacheConfiguration txCfg = new CacheConfiguration(TX_CACHE); + + atomicCfg.setBackups(1); + txCfg.setBackups(1); + + txCfg.setAtomicityMode(TRANSACTIONAL); + + cfg.setCacheConfiguration(atomicCfg, txCfg); + + cfg.setClientMode(igniteInstanceName.contains("client")); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(LOCAL_IP_FINDER); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid("server-1"); + startGrid("server-2"); + startGrid("client"); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + atomicCache = grid("client").cache(ATOMIC_CACHE); + txCache = grid("client").cache(TX_CACHE); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeAtomic() throws Exception { + testInvoke(atomicCache); + testInvokeAll(atomicCache); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeTx() throws Exception { + testInvoke(txCache); + testInvokeAll(txCache); + + IgniteEx client = grid("client"); + + Callable<Object> clos = new Callable<Object>() { + @Override public Object call() throws Exception { + testInvoke(txCache); + testInvokeAll(txCache); + + return null; + } + }; + + doInTransaction(client, PESSIMISTIC, READ_COMMITTED, clos); + doInTransaction(client, PESSIMISTIC, REPEATABLE_READ, clos); + doInTransaction(client, PESSIMISTIC, SERIALIZABLE, clos); + doInTransaction(client, OPTIMISTIC, READ_COMMITTED, clos); + doInTransaction(client, OPTIMISTIC, REPEATABLE_READ, clos); + doInTransaction(client, OPTIMISTIC, SERIALIZABLE, clos); + } + + /** + * @param cache Cache. + */ + private void testInvoke(final IgniteCache<Object, Object> cache) throws Exception { + Class<? extends Exception> exp = grid("client").transactions().tx() == null + ? EntryProcessorException.class + : NonSerializableException.class; + + //noinspection ThrowableNotThrown + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + cache.invoke("1", new UpdateProcessor()); + + return null; + } + }, exp, null); + + if (ATOMIC_CACHE.equals(cache.getName())) { + //noinspection ThrowableNotThrown + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + cache.invoke("1", new UpdateValueProcessor()); + + return null; + } + }, CachePartialUpdateException.class, null); + } + } + + /** + * @param cache Cache. + */ + private void testInvokeAll(final IgniteCache<Object, Object> cache) throws Exception { + Map<Object, EntryProcessorResult<Object>> results = cache.invokeAll(Collections.singleton("1"), new UpdateProcessor()); + + final EntryProcessorResult<Object> epRes = F.first(results.values()); + + assertNotNull(epRes); + + // In transactions EP will be invoked locally. + Class<? extends Exception> exp = grid("client").transactions().tx() == null + ? EntryProcessorException.class + : NonSerializableException.class; + + //noinspection ThrowableNotThrown + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + epRes.get(); + + return null; + } + }, exp, null); + + if (ATOMIC_CACHE.equals(cache.getName())) { + //noinspection ThrowableNotThrown + assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + cache.invokeAll(Collections.singleton("1"), new UpdateValueProcessor()); + + return null; + } + }, CachePartialUpdateException.class, null); + } + } + + /** + * + */ + private static class Value implements Externalizable, Binarylizable { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** + * + */ + public Value() { + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + throw new NotSerializableException("Test marshalling exception"); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + throw new BinaryObjectException("Test marshalling exception"); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + // No-op. + } + } + + /** + * + */ + private static class NonSerializableException extends EntryProcessorException implements Externalizable, Binarylizable { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** + * + */ + public NonSerializableException() { + super(); + } + + /** + * @param msg Message. + */ + NonSerializableException(String msg) { + super(msg); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + throw new NotSerializableException("Test marshalling exception"); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + throw new BinaryObjectException("Test marshalling exception"); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + // No-op. + } + } + + /** + * + */ + private static class UpdateProcessor implements CacheEntryProcessor<Object, Object, Object> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public Object process(MutableEntry<Object, Object> entry, + Object... arguments) throws EntryProcessorException { + throw new NonSerializableException("Test exception"); + } + } + + /** + * + */ + private static class UpdateValueProcessor implements CacheEntryProcessor<Object, Object, Object> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public Object process(MutableEntry<Object, Object> entry, + Object... arguments) throws EntryProcessorException { + return new Value(); + } + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 20_000; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/65e1c577/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index ba7aa1b..7a4d4be 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -98,6 +98,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClass import org.apache.ignite.internal.processors.cache.distributed.CacheStartOnJoinTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheFailedUpdateResponseTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheReadFromBackupTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSingleGetMessageTest; @@ -238,6 +239,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(IgniteCacheStartTest.class); suite.addTestSuite(CacheDiscoveryDataConcurrentJoinTest.class); suite.addTestSuite(IgniteClientCacheInitializationFailTest.class); + suite.addTestSuite(IgniteCacheFailedUpdateResponseTest.class); suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);
