IGNITE-5401 Fix in MarshallerConextImpl. This closes #2205.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/740b0b2b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/740b0b2b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/740b0b2b Branch: refs/heads/ignite-2.1 Commit: 740b0b2bdb37154857363c1d94ec88d867bd8b65 Parents: 907d4a8 Author: Sergey Chugunov <[email protected]> Authored: Thu Jul 6 12:20:55 2017 +0300 Committer: devozerov <[email protected]> Committed: Thu Jul 6 12:20:55 2017 +0300 ---------------------------------------------------------------------- .../jdbc2/JdbcAbstractDmlStatementSelfTest.java | 12 + .../jdbc2/JdbcDynamicIndexAbstractSelfTest.java | 2 - .../JdbcThinDynamicIndexAbstractSelfTest.java | 2 - .../ignite/internal/MarshallerContextImpl.java | 10 +- .../GridMarshallerMappingProcessor.java | 16 +- ...iteMarshallerCacheClassNameConflictTest.java | 11 +- .../IgniteMarshallerCacheFSRestoreTest.java | 217 +++++++++++++++++++ .../ignite/testsuites/IgniteBasicTestSuite.java | 2 + 8 files changed, 248 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/740b0b2b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java index a001eb3..f220b47 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java @@ -29,6 +29,7 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX; @@ -139,6 +140,17 @@ public abstract class JdbcAbstractDmlStatementSelfTest extends GridCommonAbstrac conn.close(); assertTrue(conn.isClosed()); + + cleanUpWorkingDir(); + } + + /** + * Clean up working directory. + */ + private void cleanUpWorkingDir() throws Exception { + String workDir = U.defaultWorkDirectory(); + + deleteRecursively(U.resolveWorkDirectory(workDir, "marshaller", false)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/740b0b2b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAbstractSelfTest.java index d4da1f3..7bbda6f 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAbstractSelfTest.java @@ -242,8 +242,6 @@ public abstract class JdbcDynamicIndexAbstractSelfTest extends JdbcAbstractDmlSt * Test that changes in cache affect index, and vice versa. */ public void testIndexState() throws SQLException { - fail("https://issues.apache.org/jira/browse/IGNITE-5373"); - IgniteCache<String, Person> cache = cache(); assertSize(3); http://git-wip-us.apache.org/repos/asf/ignite/blob/740b0b2b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java index 7404ebd..3f762fc 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java @@ -254,8 +254,6 @@ public abstract class JdbcThinDynamicIndexAbstractSelfTest extends JdbcThinAbstr * @throws SQLException If failed. */ public void testIndexState() throws SQLException { - fail("https://issues.apache.org/jira/browse/IGNITE-5373"); - IgniteCache<String, Person> cache = cache(); assertSize(3); http://git-wip-us.apache.org/repos/asf/ignite/blob/740b0b2b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index cad06c3..6f15507 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -316,18 +316,14 @@ public class MarshallerContextImpl implements MarshallerContext { * * @param item type mapping to propose * @return null if cache doesn't contain any mappings for given (platformId, typeId) pair, - * previous class name otherwise. + * previous {@link MappedName mapped name} otherwise. */ - public String onMappingProposed(MarshallerMappingItem item) { + public MappedName onMappingProposed(MarshallerMappingItem item) { ConcurrentMap<Integer, MappedName> cache = getCacheFor(item.platformId()); MappedName newName = new MappedName(item.className(), false); - MappedName oldName; - if ((oldName = cache.putIfAbsent(item.typeId(), newName)) == null) - return null; - else - return oldName.className(); + return cache.putIfAbsent(item.typeId(), newName); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/740b0b2b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java index c23d068..df0c720 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java @@ -113,7 +113,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter { marshallerCtx.onMarshallerProcessorStarted(ctx, transport); - discoMgr.setCustomEventListener(MappingProposedMessage.class, new MarshallerMappingExchangeListener()); + discoMgr.setCustomEventListener(MappingProposedMessage.class, new MappingProposedListener()); discoMgr.setCustomEventListener(MappingAcceptedMessage.class, new MappingAcceptedListener()); @@ -233,7 +233,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter { /** * */ - private final class MarshallerMappingExchangeListener implements CustomEventListener<MappingProposedMessage> { + private final class MappingProposedListener implements CustomEventListener<MappingProposedMessage> { /** {@inheritDoc} */ @Override public void onCustomEvent( AffinityTopologyVersion topVer, @@ -246,13 +246,15 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter { if (!msg.inConflict()) { MarshallerMappingItem item = msg.mappingItem(); - String conflictingName = marshallerCtx.onMappingProposed(item); + MappedName existingName = marshallerCtx.onMappingProposed(item); - if (conflictingName != null) { - if (conflictingName.equals(item.className())) + if (existingName != null) { + String existingClsName = existingName.className(); + + if (existingClsName.equals(item.className()) && !existingName.accepted()) msg.markDuplicated(); - else - msg.conflictingWithClass(conflictingName); + else if (!existingClsName.equals(item.className())) + msg.conflictingWithClass(existingClsName); } } else { http://git-wip-us.apache.org/repos/asf/ignite/blob/740b0b2b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java index c8a0e76..80d0fd1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java @@ -55,9 +55,6 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; */ public class IgniteMarshallerCacheClassNameConflictTest extends GridCommonAbstractTest { /** */ - private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** */ private volatile boolean bbClsRejected; /** */ @@ -79,7 +76,7 @@ public class IgniteMarshallerCacheClassNameConflictTest extends GridCommonAbstra IgniteConfiguration cfg = super.getConfiguration(gridName); TcpDiscoverySpi disco = new TestTcpDiscoverySpi(); - disco.setIpFinder(ipFinder); + disco.setIpFinder(LOCAL_IP_FINDER); cfg.setDiscoverySpi(disco); @@ -207,7 +204,7 @@ public class IgniteMarshallerCacheClassNameConflictTest extends GridCommonAbstra DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null : (DiscoveryCustomMessage) U.field(spiCustomMsg, "delegate"); - if (customMsg != null) + if (customMsg != null) { //don't want to make this class public, using equality of class name instead of instanceof operator if ("MappingProposedMessage".equals(customMsg.getClass().getSimpleName())) { String conflClsName = U.field(customMsg, "conflictingClsName"); @@ -219,8 +216,10 @@ public class IgniteMarshallerCacheClassNameConflictTest extends GridCommonAbstra aaClsRejected = true; } } + } - delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, spiCustomMsg); + if (delegate != null) + delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, spiCustomMsg); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/740b0b2b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java new file mode 100644 index 0000000..38fa324 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Map; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.apache.ignite.spi.discovery.DiscoverySpiListener; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest { + /** */ + private volatile boolean isDuplicateObserved = true; + + /** + * + */ + private static class SimpleValue { + /** */ + private final int iF; + + /** */ + private final String sF; + + /** + * @param iF Int field. + * @param sF String field. + */ + SimpleValue(int iF, String sF) { + this.iF = iF; + this.sF = sF; + } + } + + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + TcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi(); + discoSpi.setIpFinder(LOCAL_IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + CacheConfiguration singleCacheConfig = new CacheConfiguration() + .setName(DEFAULT_CACHE_NAME) + .setCacheMode(CacheMode.PARTITIONED) + .setBackups(1) + .setAtomicityMode(CacheAtomicityMode.ATOMIC); + + cfg.setCacheConfiguration(singleCacheConfig); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + cleanUpWorkDir(); + } + + /** + * + */ + private void cleanUpWorkDir() throws Exception { + String workDir = U.defaultWorkDirectory(); + + deleteRecursively(U.resolveWorkDirectory(workDir, "marshaller", false)); + } + + /** + * Test checks a scenario when in multinode cluster one node may read marshaller mapping + * from file storage and add it directly to marshaller context with accepted=true flag, + * when another node sends a proposed request for the same mapping. + * + * In that case the request must not be marked as duplicate and must be processed in a regular way. + * No hangs must take place. + * + * @see <a href="https://issues.apache.org/jira/browse/IGNITE-5401">IGNITE-5401</a> Take a look at JIRA ticket for more information about context of this test. + * + * This test must never hang on proposing of MarshallerMapping. + */ + public void testFileMappingReadAndPropose() throws Exception { + prepareMarshallerFileStore(); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + BinaryObject obj0 = ignite0.binary().builder(SimpleValue.class.getName()) + .setField("iF", 10) + .setField("sF", "str0") + .build(); + + BinaryObject obj1 = ignite0.binary().builder(SimpleValue.class.getName()) + .setField("iF", 20) + .setField("sF", "str1") + .build(); + + IgniteCache<Object, Object> binCache = ignite0.cache(DEFAULT_CACHE_NAME).withKeepBinary(); + + binCache.put(1, obj0); + binCache.put(2, obj1); + + ignite0.cache(DEFAULT_CACHE_NAME).remove(1); + + ignite1.cache(DEFAULT_CACHE_NAME).put(3, new SimpleValue(30, "str2")); + + assertFalse(isDuplicateObserved); + } + + /** + * + */ + private void prepareMarshallerFileStore() throws Exception { + String typeName = SimpleValue.class.getName(); + int typeId = typeName.toLowerCase().hashCode(); + + String fileName = typeId + ".classname0"; + + File marshStoreDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "marshaller", false); + + try(FileOutputStream out = new FileOutputStream(new File(marshStoreDir, fileName))) { + try (Writer writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) { + writer.write(typeName); + + writer.flush(); + } + } + } + + /** */ + private class TestTcpDiscoverySpi extends TcpDiscoverySpi { + + /** */ + private class DiscoverySpiListenerWrapper implements DiscoverySpiListener { + /** */ + private DiscoverySpiListener delegate; + + /** + * @param delegate Delegate. + */ + private DiscoverySpiListenerWrapper(DiscoverySpiListener delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public void onDiscovery( + int type, + long topVer, + ClusterNode node, + Collection<ClusterNode> topSnapshot, + @Nullable Map<Long, Collection<ClusterNode>> topHist, + @Nullable DiscoverySpiCustomMessage spiCustomMsg + ) { + DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null + : (DiscoveryCustomMessage) U.field(spiCustomMsg, "delegate"); + + if (customMsg != null) { + //don't want to make this class public, using equality of class name instead of instanceof operator + if ("MappingProposedMessage".equals(customMsg.getClass().getSimpleName())) { + try { + isDuplicateObserved = U.invoke(MappingProposedMessage.class, customMsg, "duplicated"); + } + catch (Exception e) { + log().error("Error when examining MappingProposedMessage.", e); + } + } + } + + if (delegate != null) + delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, spiCustomMsg); + } + + /** {@inheritDoc} */ + @Override public void onLocalNodeInitialized(ClusterNode locNode) { + // No-op. + } + } + + /** {@inheritDoc} */ + @Override public void setListener(@Nullable DiscoverySpiListener lsnr) { + super.setListener(new DiscoverySpiListenerWrapper(lsnr)); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/740b0b2b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index de509ab..d79e868 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.IgniteDaemonNodeMarshallerCac import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClassNameConflictTest; import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClientRequestsMappingOnMissTest; import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheConcurrentReadWriteTest; +import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheFSRestoreTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteRejectConnectOnNodeStopTest; import org.apache.ignite.internal.processors.closure.GridClosureProcessorSelfTest; import org.apache.ignite.internal.processors.closure.GridClosureSerializationTest; @@ -173,6 +174,7 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTestSuite(FreeListImplSelfTest.class); suite.addTestSuite(MemoryMetricsSelfTest.class); + suite.addTestSuite(IgniteMarshallerCacheFSRestoreTest.class); suite.addTestSuite(IgniteMarshallerCacheClassNameConflictTest.class); suite.addTestSuite(IgniteMarshallerCacheClientRequestsMappingOnMissTest.class);
