http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcComplexQuerySelfTest.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcComplexQuerySelfTest.java index 03196a4,35321b2..ebda604 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcComplexQuerySelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcComplexQuerySelfTest.java @@@ -52,12 -49,18 +52,12 @@@ public class JdbcComplexQuerySelfTest e private Statement stmt; /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - CacheConfiguration<?,?> cache = defaultCacheConfiguration(); - - cache.setCacheMode(PARTITIONED); - cache.setBackups(1); - cache.setWriteSynchronizationMode(FULL_SYNC); - cache.setAtomicityMode(TRANSACTIONAL); - cache.setIndexedTypes(String.class, Organization.class, AffinityKey.class, Person.class); - - cfg.setCacheConfiguration(cache); + cfg.setCacheConfiguration( + cacheConfiguration("pers", AffinityKey.class, Person.class), + cacheConfiguration("org", String.class, Organization.class)); TcpDiscoverySpi disco = new TcpDiscoverySpi();
http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcLocalCachesSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMetadataSelfTest.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMetadataSelfTest.java index 7266f52,fb22203..750af74 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMetadataSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMetadataSelfTest.java @@@ -54,15 -53,20 +54,15 @@@ public class JdbcMetadataSelfTest exten private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** JDBC URL. */ - private static final String BASE_URL = CFG_URL_PREFIX + "modules/clients/src/test/config/jdbc-config.xml"; + private static final String BASE_URL = CFG_URL_PREFIX + "cache=pers@modules/clients/src/test/config/jdbc-config.xml"; /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - CacheConfiguration<?,?> cache = defaultCacheConfiguration(); - - cache.setCacheMode(PARTITIONED); - cache.setBackups(1); - cache.setWriteSynchronizationMode(FULL_SYNC); - cache.setIndexedTypes(String.class, Organization.class, AffinityKey.class, Person.class); - - cfg.setCacheConfiguration(cache); + cfg.setCacheConfiguration( + cacheConfiguration("pers", AffinityKey.class, Person.class), + cacheConfiguration("org", String.class, Organization.class)); TcpDiscoverySpi disco = new TcpDiscoverySpi(); http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatementSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcComplexQuerySelfTest.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcComplexQuerySelfTest.java index 656bc5d,e5cd09b..dd035b2 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcComplexQuerySelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcComplexQuerySelfTest.java @@@ -51,10 -51,18 +51,10 @@@ public class JdbcComplexQuerySelfTest e private Statement stmt; /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - CacheConfiguration<?,?> cache = defaultCacheConfiguration(); - - cache.setCacheMode(PARTITIONED); - cache.setBackups(1); - cache.setWriteSynchronizationMode(FULL_SYNC); - cache.setAtomicityMode(TRANSACTIONAL); - cache.setIndexedTypes(String.class, Organization.class, AffinityKey.class, Person.class); - - cfg.setCacheConfiguration(cache); + cfg.setCacheConfiguration(cacheConfiguration()); TcpDiscoverySpi disco = new TcpDiscoverySpi(); http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcMetadataSelfTest.java ---------------------------------------------------------------------- diff --cc modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcMetadataSelfTest.java index 59b69cf,48ed548..3ba5ed2 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcMetadataSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcMetadataSelfTest.java @@@ -52,12 -52,21 +52,12 @@@ public class JdbcMetadataSelfTest exten private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** URL. */ - private static final String URL = "jdbc:ignite://127.0.0.1/"; + private static final String URL = "jdbc:ignite://127.0.0.1/pers"; /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - CacheConfiguration<?,?> cache = defaultCacheConfiguration(); - - cache.setCacheMode(PARTITIONED); - cache.setBackups(1); - cache.setWriteSynchronizationMode(FULL_SYNC); - cache.setIndexedTypes(String.class, Organization.class, AffinityKey.class, Person.class); - - cfg.setCacheConfiguration(cache); - TcpDiscoverySpi disco = new TcpDiscoverySpi(); disco.setIpFinder(IP_FINDER); http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/Ignite.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java index bbe2872,53f9e4c..750d3e1 --- a/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java @@@ -38,11 -41,8 +41,11 @@@ public class QueryIndex implements Seri private LinkedHashMap<String, Boolean> fields; /** */ - private QueryIndexType type; + private QueryIndexType type = DFLT_IDX_TYP; + /** */ + private int inlineSize = -1; + /** * Creates an empty index. Should be populated via setters. */ @@@ -223,26 -232,11 +235,29 @@@ * Sets index type. * * @param type Index type. + * @return {@code this} for chaining. */ - public void setIndexType(QueryIndexType type) { + public QueryIndex setIndexType(QueryIndexType type) { this.type = type; + + return this; } + + /** + * Gets inline size. + * + * @return inline size. + */ + public int getInlineSize() { + return inlineSize; + } + + /** + * Sets inline size. + * + * @param inlineSize Inline size. + */ + public void setInlineSize(int inlineSize) { + this.inlineSize = inlineSize; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 20ec5e6,a47f07c..25398ca --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@@ -2527,11 -2540,10 +2530,11 @@@ public class CacheConfiguration<K, V> e * * @param idxName Index name. * @param type Index type. + * @param inlineSize Inline size. * @return Index descriptor. */ - public IndexDescriptor addIndex(String idxName, GridQueryIndexType type, int inlineSize) { - public IndexDescriptor addIndex(String idxName, QueryIndexType type) { - IndexDescriptor idx = new IndexDescriptor(type); ++ public IndexDescriptor addIndex(String idxName, QueryIndexType type, int inlineSize) { + IndexDescriptor idx = new IndexDescriptor(type, inlineSize); if (indexes.put(idxName, idx) != null) throw new CacheException("Index with name '" + idxName + "' already exists."); @@@ -2540,17 -2552,6 +2543,17 @@@ } /** + * Adds index. + * + * @param idxName Index name. + * @param type Index type. + * @return Index descriptor. + */ - public IndexDescriptor addIndex(String idxName, GridQueryIndexType type) { ++ public IndexDescriptor addIndex(String idxName, QueryIndexType type) { + return addIndex(idxName, type, -1); + } + + /** * Adds field to index. * * @param idxName Index name. @@@ -2675,27 -2676,15 +2678,27 @@@ private Collection<String> descendings; /** */ - private final GridQueryIndexType type; + private final QueryIndexType type; + /** */ + private final int inlineSize; + /** * @param type Type. + * @param inlineSize Inline size. */ - private IndexDescriptor(GridQueryIndexType type, int inlineSize) { - private IndexDescriptor(QueryIndexType type) { ++ private IndexDescriptor(QueryIndexType type, int inlineSize) { assert type != null; this.type = type; + this.inlineSize = inlineSize; + } + + /** + * @param type Type. + */ - private IndexDescriptor(GridQueryIndexType type) { ++ private IndexDescriptor(QueryIndexType type) { + this(type, -1); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index cda74ae,5b3dcc9..4ae2b90 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@@ -513,8 -509,6 +513,7 @@@ public class IgniteConfiguration allResolversPassReq = cfg.isAllSegmentationResolversPassRequired(); atomicCfg = cfg.getAtomicConfiguration(); binaryCfg = cfg.getBinaryConfiguration(); - daemon = cfg.isDaemon(); + dbCfg = cfg.getMemoryConfiguration(); cacheCfg = cfg.getCacheConfiguration(); cacheKeyCfg = cfg.getCacheKeyConfiguration(); cacheSanityCheckEnabled = cfg.isCacheSanityCheckEnabled(); http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java index 7bbe9e2,560d7f6..c91daae --- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java @@@ -49,7 -49,7 +52,10 @@@ public interface GridComponent CLUSTER_PROC, /** */ - DISCOVERY_PROC ++ DISCOVERY_PROC, ++ ++ /** */ + MARSHALLER_PROC } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index ee70f1e,0738df3..f2e107f --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@@ -49,7 -49,9 +49,8 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter; import org.apache.ignite.internal.processors.job.GridJobProcessor; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor; + import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor; import org.apache.ignite.internal.processors.odbc.OdbcProcessor; -import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor; import org.apache.ignite.internal.processors.platform.PlatformProcessor; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; import org.apache.ignite.internal.processors.pool.PoolProcessor; http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index f12c9b8,60b368c..ab2e000 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@@ -65,9 -65,12 +65,11 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter; import org.apache.ignite.internal.processors.job.GridJobProcessor; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor; + import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor; import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor; import org.apache.ignite.internal.processors.odbc.OdbcProcessor; -import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor; import org.apache.ignite.internal.processors.platform.PlatformProcessor; + import org.apache.ignite.internal.processors.platform.plugin.PlatformPluginProcessor; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; import org.apache.ignite.internal.processors.pool.PoolProcessor; import org.apache.ignite.internal.processors.port.GridPortProcessor; http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 05002a7,0ea6ea4..6252182 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@@ -122,8 -122,10 +123,9 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor; import org.apache.ignite.internal.processors.nodevalidation.OsDiscoveryNodeValidationProcessor; import org.apache.ignite.internal.processors.odbc.OdbcProcessor; -import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor; import org.apache.ignite.internal.processors.platform.PlatformNoopProcessor; import org.apache.ignite.internal.processors.platform.PlatformProcessor; + import org.apache.ignite.internal.processors.platform.plugin.PlatformPluginProcessor; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; import org.apache.ignite.internal.processors.pool.PoolProcessor; import org.apache.ignite.internal.processors.port.GridPortProcessor; @@@ -889,47 -907,40 +903,49 @@@ public class IgniteKernal implements Ig // Start processors before discovery manager, so they will // be able to start receiving messages once discovery completes. - startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx)); - startProcessor(new GridClockSyncProcessor(ctx)); - startProcessor(new GridAffinityProcessor(ctx)); - startProcessor(createComponent(GridSegmentationProcessor.class, ctx)); - startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx)); - startProcessor(new GridCacheProcessor(ctx)); - startProcessor(new GridQueryProcessor(ctx)); - startProcessor(new OdbcProcessor(ctx)); - startProcessor(new GridServiceProcessor(ctx)); - startProcessor(new GridTaskSessionProcessor(ctx)); - startProcessor(new GridJobProcessor(ctx)); - startProcessor(new GridTaskProcessor(ctx)); - startProcessor((GridProcessor)SCHEDULE.createOptional(ctx)); - startProcessor(new GridRestProcessor(ctx)); - startProcessor(new DataStreamProcessor(ctx)); - startProcessor((GridProcessor)IGFS.create(ctx, F.isEmpty(cfg.getFileSystemConfiguration()))); - startProcessor(new GridContinuousProcessor(ctx)); - startProcessor(createHadoopComponent()); - startProcessor(new DataStructuresProcessor(ctx)); - startProcessor(createComponent(PlatformProcessor.class, ctx)); - startProcessor(new GridMarshallerMappingProcessor(ctx)); - - // Start plugins. - for (PluginProvider provider : ctx.plugins().allProviders()) { - ctx.add(new GridPluginComponent(provider)); + try { + startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx)); + startProcessor(new GridClockSyncProcessor(ctx)); + startProcessor(new GridAffinityProcessor(ctx)); + startProcessor(createComponent(GridSegmentationProcessor.class, ctx)); + startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx)); + startProcessor(new GridCacheProcessor(ctx)); + startProcessor(new GridClusterStateProcessor(ctx)); + startProcessor(new GridQueryProcessor(ctx)); + startProcessor(new OdbcProcessor(ctx)); + startProcessor(new GridServiceProcessor(ctx)); + startProcessor(new GridTaskSessionProcessor(ctx)); + startProcessor(new GridJobProcessor(ctx)); + startProcessor(new GridTaskProcessor(ctx)); + startProcessor((GridProcessor)SCHEDULE.createOptional(ctx)); + startProcessor(new GridRestProcessor(ctx)); + startProcessor(new DataStreamProcessor(ctx)); + startProcessor((GridProcessor)IGFS.create(ctx, F.isEmpty(cfg.getFileSystemConfiguration()))); + startProcessor(new GridContinuousProcessor(ctx)); + startProcessor(createHadoopComponent()); + startProcessor(new DataStructuresProcessor(ctx)); + startProcessor(createComponent(PlatformProcessor.class, ctx)); ++ startProcessor(new GridMarshallerMappingProcessor(ctx)); + + // Start plugins. + for (PluginProvider provider : ctx.plugins().allProviders()) { + ctx.add(new GridPluginComponent(provider)); + + provider.start(ctx.plugins().pluginContextForProvider(provider)); + } - fillNodeAttributes(clusterProc.updateNotifierEnabled()); - provider.start(ctx.plugins().pluginContextForProvider(provider)); -- } ++ // Start platform plugins. ++ if (ctx.config().getPlatformConfiguration() != null) ++ startProcessor(new PlatformPluginProcessor(ctx));fillNodeAttributes(clusterProc.updateNotifierEnabled());} + catch (Throwable e) { + U.error( + log, "Exception during start processors, node will be stopped and close connections", e); - // Start platform plugins. - if (ctx.config().getPlatformConfiguration() != null) - startProcessor(new PlatformPluginProcessor(ctx)); + // Stop discovery spi to close tcp socket. + ctx.discovery().stop(true); - fillNodeAttributes(clusterProc.updateNotifierEnabled()); + throw e; + } gw.writeLock(); @@@ -956,15 -967,13 +972,17 @@@ ctx.performance().addAll(OsConfigurationSuggestions.getSuggestions()); // Notify discovery manager the first to make sure that topology is discovered. - ctx.discovery().onKernalStart(); + ctx.discovery().onKernalStart(activeOnStart); // Notify IO manager the second so further components can send and receive messages. - ctx.io().onKernalStart(); + ctx.io().onKernalStart(activeOnStart); + + // Start plugins. + for (PluginProvider provider : ctx.plugins().allProviders()) + provider.onIgniteStart(); + boolean recon = false; + // Callbacks. for (GridComponent comp : ctx) { // Skip discovery manager. @@@ -975,13 -984,24 +993,27 @@@ if (comp instanceof GridIoManager) continue; + if (comp instanceof GridPluginComponent) + continue; + - if (!skipDaemon(comp)) - comp.onKernalStart(activeOnStart); + if (!skipDaemon(comp)) { + try { - comp.onKernalStart(); ++ comp.onKernalStart(activeOnStart); + } + catch (IgniteNeedReconnectException e) { + assert ctx.discovery().reconnectSupported(); + + if (log.isDebugEnabled()) + log.debug("Failed to start node components on node start, will wait for reconnect: " + e); + + recon = true; + } + } } + if (recon) + reconnectState.waitFirstReconnect(); + // Register MBeans. registerKernalMBean(); registerLocalNodeMBean(); @@@ -2031,11 -2049,9 +2065,9 @@@ List<GridComponent> comps = ctx.components(); - ctx.marshallerContext().onKernalStop(); - // Callback component in reverse order while kernal is still functional // if called in the same thread, at least. - for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious();) { + for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious(); ) { GridComponent comp = it.previous(); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index cead53b,f6cfe12..9b56074 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@@ -2382,8 -2402,8 +2390,8 @@@ public class IgnitionEx try { grid0.stop(cancel); - if (log.isDebugEnabled()) + if (log != null && log.isDebugEnabled()) - log.debug("Grid instance stopped ok: " + name); + log.debug("Ignite instance stopped ok: " + name); } catch (Throwable e) { U.error(log, "Failed to properly stop grid instance due to undeclared exception.", e); http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index a50c22e,5416ff0..abea48b --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@@ -18,34 -18,43 +18,42 @@@ package org.apache.ignite.internal; import java.io.BufferedReader; - import java.io.File; - import java.io.FileInputStream; - import java.io.FileOutputStream; import java.io.IOException; + import java.io.InputStream; import java.io.InputStreamReader; - import java.io.OutputStreamWriter; - import java.io.Writer; - import java.nio.channels.FileChannel; - import java.nio.channels.FileLock; - import java.nio.channels.OverlappingFileLockException; - import java.nio.charset.StandardCharsets; + import java.net.URL; + import java.util.AbstractMap; + import java.util.ArrayList; + import java.util.Collection; + import java.util.Enumeration; + import java.util.HashMap; + import java.util.HashSet; + import java.util.Iterator; import java.util.List; - import java.util.concurrent.CountDownLatch; - import java.util.concurrent.ThreadLocalRandom; - import java.util.concurrent.locks.Lock; - import javax.cache.event.CacheEntryEvent; - import javax.cache.event.CacheEntryListenerException; - import javax.cache.event.CacheEntryUpdatedListener; + import java.util.Map; + import java.util.Set; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; import org.apache.ignite.IgniteCheckedException; - import org.apache.ignite.IgniteLogger; - import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; - import org.apache.ignite.internal.processors.cache.GridCacheAdapter; - import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException; - import org.apache.ignite.internal.util.GridStripedLock; - import org.apache.ignite.internal.util.typedef.F; - import org.apache.ignite.internal.util.typedef.internal.CU; + import org.apache.ignite.IgniteException; + import org.apache.ignite.configuration.IgniteConfiguration; + import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; + import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; + import org.apache.ignite.internal.processors.closure.GridClosureProcessor; + import org.apache.ignite.internal.processors.marshaller.MappedName; + import org.apache.ignite.internal.processors.marshaller.MappingExchangeResult; + import org.apache.ignite.internal.processors.marshaller.MarshallerMappingItem; + import org.apache.ignite.internal.processors.marshaller.MarshallerMappingTransport; + import org.apache.ignite.internal.util.IgniteUtils; + import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.internal.U; + import org.apache.ignite.marshaller.MarshallerContext; import org.apache.ignite.plugin.PluginProvider; + import org.jetbrains.annotations.NotNull; + import org.jetbrains.annotations.Nullable; + import org.jsr166.ConcurrentHashMap8; + + import static org.apache.ignite.internal.MarshallerPlatformIds.JAVA_ID; /** * Marshaller context implementation. @@@ -137,10 -262,67 +261,67 @@@ public class MarshallerContextImpl impl } /** - * Release marshaller context. + * @param res result of exchange. */ - public void onKernalStop() { - latch.countDown(); + private boolean convertXchRes(MappingExchangeResult res) throws IgniteCheckedException { + if (res.successful()) + return true; + else if (res.exchangeDisabled()) + return false; + else { + assert res.error() != null; + throw res.error(); + } + } + + /** + * @param platformId Platform id. + * @param typeId Type id. + * @param conflictingClsName Conflicting class name. + * @param clsName Class name. + */ + private IgniteCheckedException duplicateIdException( + byte platformId, + int typeId, + String conflictingClsName, + String clsName + ) { + return new IgniteCheckedException("Duplicate ID [platformId=" - + platformId - + ", typeId=" - + typeId - + ", oldCls=" - + conflictingClsName - + ", newCls=" - + clsName + "]"); ++ + platformId ++ + ", typeId=" ++ + typeId ++ + ", oldCls=" ++ + conflictingClsName ++ + ", newCls=" ++ + clsName + "]"); + } + + /** + * + * @param item type mapping to propose + * @return false if there is a conflict with another mapping in local cache, true otherwise. + */ + public String onMappingProposed(MarshallerMappingItem item) { + ConcurrentMap<Integer, MappedName> cache = getCacheFor(item.platformId()); + + MappedName newName = new MappedName(item.className(), false); + MappedName oldName; + + if ((oldName = cache.putIfAbsent(item.typeId(), newName)) == null) + return null; + else + return oldName.className(); + } + + /** + * @param item Item. + */ + public void onMappingAccepted(final MarshallerMappingItem item) { + ConcurrentMap<Integer, MappedName> cache = getCacheFor(item.platformId()); + + cache.replace(item.typeId(), new MappedName(item.className(), true)); + + closProc.runLocalSafe(new MappingStoreTask(fileStore, item.platformId(), item.typeId(), item.className())); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index afe4a8e,07e8941..214c507 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@@ -174,21 -177,26 +180,41 @@@ public class GridIoMessageFactory imple Message msg = null; switch (type) { - case -48: ++ case -51: + msg = new NearCacheUpdates(); + + break; + - case -47: ++ case -50: + msg = new GridNearAtomicCheckUpdateRequest(); + + break; + - case -46: ++ case -49: + msg = new UpdateErrors(); + + break; + - case -45: ++ case -48: + msg = new GridDhtAtomicNearResponse(); + + break; + + case -47: + msg = new SnapshotProgressMessage(); + + break; + + case -46: + msg = new GridChangeGlobalStateMessageResponse(); + + break; + + case -45: + msg = new SnapshotFinishedMessage(); + + break; + case -44: msg = new TcpCommunicationSpi.HandshakeMessage2(); http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 6bad92a,3bfd1f8..61cbba5 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@@ -86,13 -85,12 +86,12 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; + import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; - import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; - import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; @@@ -506,9 -500,8 +505,8 @@@ public abstract class GridCacheAdapter< @Override public final GridCacheProxyImpl<K, V> withExpiryPolicy(ExpiryPolicy plc) { assert !CU.isUtilityCache(ctx.name()); assert !CU.isAtomicsCache(ctx.name()); - assert !CU.isMarshallerCache(ctx.name()); - CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc, false, null); + CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc, false, null, false); return new GridCacheProxyImpl<>(ctx, this, opCtx); } @@@ -2218,8 -2130,7 +2216,8 @@@ } else { return asyncOp(tx, new AsyncOp<Map<K1, V1>>(keys) { - @Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx, - @Override public IgniteInternalFuture<Map<K1, V1>> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { ++ @Override public IgniteInternalFuture<Map<K1, V1>> op(GridNearTxLocal tx, + AffinityTopologyVersion readyTopVer) { return tx.getAllAsync(ctx, readyTopVer, keys, @@@ -2227,6 -2138,6 +2225,7 @@@ skipVals, false, !readThrough, ++ recovery, needVer); } }, ctx.operationContextPerCall()); @@@ -2619,8 -2530,7 +2618,8 @@@ validateCacheKeys(map.keySet()); IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(map.keySet()) { - @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx, - @Override public IgniteInternalFuture<GridCacheReturn> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { ++ @Override public IgniteInternalFuture<GridCacheReturn> op(GridNearTxLocal tx, + AffinityTopologyVersion readyTopVer) { return tx.invokeAsync(ctx, readyTopVer, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java index 5f6a767,8c47554..df19225 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java @@@ -17,9 -17,12 +17,10 @@@ package org.apache.ignite.internal.processors.cache; -import java.util.Iterator; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; + import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 68f786e,5573cea..e8513de --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@@ -80,8 -81,8 +80,9 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; -import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor; import org.apache.ignite.internal.processors.plugin.CachePluginManager; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; ++import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.lang.GridFunc; @@@ -2015,13 -2069,6 +2017,13 @@@ public class GridCacheContext<K, V> imp } /** + * @return {@code True} if fast eviction is allowed. + */ + public boolean allowFastEviction() { - return shared().database().persistenceEnabled() && !GridQueryProcessor.isEnabled(cacheCfg); ++ return shared().database().persistenceEnabled() && !QueryUtils.isEnabled(cacheCfg); + } + + /** * @param part Partition. * @param affNodes Affinity nodes. * @param topVer Topology version. http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java index 14e2bb6,39a7b36..9e5620c --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java @@@ -268,13 -267,10 +267,8 @@@ public class GridCacheDeploymentManager if (cacheCtx.isNear()) cacheCtx.near().dht().clearLocally(keys, true); - GridCacheQueryManager<K, V> qryMgr = cacheCtx.queries(); - - if (qryMgr != null) - qryMgr.onUndeploy(ldr); - // Examine swap for entries to undeploy. - int swapUndeployCnt = cacheCtx.isNear() ? - cacheCtx.near().dht().context().swap().onUndeploy(ldr) : - cacheCtx.swap().onUndeploy(ldr); + int swapUndeployCnt = cacheCtx.offheap().onUndeploy(ldr); if (cacheCtx.userCache() && (!keys.isEmpty() || swapUndeployCnt != 0)) { U.quietAndWarn(log, ""); http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index f339f46,3668910..7073eed --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@@ -30,11 -30,8 +30,9 @@@ import java.util.LinkedList import java.util.List; import java.util.Map; import java.util.NavigableMap; - import java.util.Queue; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.BlockingQueue; - import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.LinkedBlockingDeque; @@@ -54,11 -52,11 +52,13 @@@ import org.apache.ignite.internal.Ignit import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; ++import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.discovery.DiscoCache; + import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; +import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; @@@ -68,9 -66,10 +68,8 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; --import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; @@@ -276,15 -268,19 +274,25 @@@ public class GridCachePartitionExchange exchFut = exchangeFuture(exchId, evt, cache, null, msg); } } - else - exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg); + else { + exchangeFuture(msg.exchangeId(), null, null, null, null) + .onAffinityChangeMessage(evt.eventNode(), msg); + } } - else if (customEvt.customMessage() instanceof StartFullSnapshotAckDiscoveryMessage - && !((StartFullSnapshotAckDiscoveryMessage)customEvt.customMessage()).hasError()) { ++ else if (customMsg instanceof StartFullSnapshotAckDiscoveryMessage ++ && !((StartFullSnapshotAckDiscoveryMessage)customMsg).hasError()) { + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); + + exchFut = exchangeFuture(exchId, evt, null, null, null); + } + else { + // Process event as custom discovery task if needed. + CachePartitionExchangeWorkerTask task = + cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg); + + if (task != null) + exchWorker.addCustomTask(task); + } } if (exchId != null) { @@@ -1617,13 -1633,59 +1651,66 @@@ log.debug("Added exchange future to exchange worker: " + exchFut); } + /** {@inheritDoc} */ + @Override public void cancel() { + synchronized (interruptLock) { + super.cancel(); + } + } + + /** + * Add custom exchange task. + * + * @param task Task. + */ + void addCustomTask(CachePartitionExchangeWorkerTask task) { + assert task != null; + + assert !task.isExchange(); + + futQ.offer(task); + } + + /** + * Process custom exchange task. + * + * @param task Task. + */ + void processCustomTask(CachePartitionExchangeWorkerTask task) { + try { + cctx.cache().processCustomExchangeTask(task); + } + catch (Exception e) { + U.warn(log, "Failed to process custom exchange task: " + task, e); + } + } + + /** + * @return Whether pending exchange future exists. + */ + boolean hasPendingExchange() { + if (!futQ.isEmpty()) { + for (CachePartitionExchangeWorkerTask task : futQ) { + if (task.isExchange()) + return true; + } + } + + return false; + } + + /** + * Dump debug info. + */ + void dumpExchangeDebugInfo() { + U.warn(log, "Pending exchange futures:"); + + for (CachePartitionExchangeWorkerTask task: futQ) { + if (task.isExchange()) + U.warn(log, ">>> " + task); + } + } + /** {@inheritDoc} */ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { long timeout = cctx.gridConfig().getNetworkTimeout(); @@@ -1631,10 -1693,10 +1718,10 @@@ int cnt = 0; while (!isCancelled()) { - CachePartitionExchangeWorkerTask task = null; - cnt++; - GridDhtPartitionsExchangeFuture exchFut = null; ++ CachePartitionExchangeWorkerTask task = null; + try { boolean preloadFinished = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index c18dbcf,a7d38a7..9d7b40f mode 100644,100755..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@@ -213,9 -213,8 +217,9 @@@ public class GridCacheProcessor extend caches = new ConcurrentHashMap<>(); jCacheProxies = new ConcurrentHashMap<>(); stopSeq = new LinkedList<>(); + internalCaches = new HashSet<>(); - marsh = MarshallerUtils.jdkMarshaller(ctx.gridName()); + marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName()); } /** @@@ -596,146 -642,99 +619,144 @@@ CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration(); - sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx, - ctx.config().getCacheStoreSessionListenerFactories())); + registerCacheFromConfig(cfgs); + + registerCacheFromPersistentStore(cfgs); + + if (log.isDebugEnabled()) + log.debug("Started cache processor."); + } + /** + * @param cfgs Cache configurations. + * @throws IgniteCheckedException If failed. + */ + private void registerCacheFromConfig(CacheConfiguration[] cfgs) throws IgniteCheckedException { for (int i = 0; i < cfgs.length; i++) { - if (ctx.config().isDaemon() && !CU.isMarshallerCache(cfgs[i].getName())) + if (ctx.config().isDaemon()) continue; - cloneCheckSerializable(cfgs[i]); - CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]); - CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg); + cfgs[i] = cfg; // Replace original configuration value. - // Initialize defaults. - initialize(cfg, cacheObjCtx); + registerCache(cfg); + } + } - cfgs[i] = cfg; // Replace original configuration value. + /** + * @param cfgs Cache configurations. + * @throws IgniteCheckedException If failed. + */ + private void registerCacheFromPersistentStore(CacheConfiguration[] cfgs) throws IgniteCheckedException { + if (sharedCtx.pageStore() != null && + sharedCtx.database().persistenceEnabled() && + !ctx.config().isDaemon()) { - String masked = maskNull(cfg.getName()); + Set<String> savedCacheNames = sharedCtx.pageStore().savedCacheNames(); - if (registeredCaches.containsKey(masked)) { - String cacheName = cfg.getName(); + for (CacheConfiguration cfg : cfgs) + savedCacheNames.remove(cfg.getName()); - if (cacheName != null) - throw new IgniteCheckedException("Duplicate cache name found (check configuration and " + - "assign unique name to each cache): " + U.maskName(cacheName)); - else - throw new IgniteCheckedException("Default cache has already been configured (check configuration and " + - "assign unique name to each cache)."); + for (String name : internalCaches) + savedCacheNames.remove(name); + + if (!F.isEmpty(savedCacheNames)) { + log.info("Registrate persistent caches: " + savedCacheNames); + + for (String name : savedCacheNames) { + CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name); + + if (cfg != null) + registerCache(cfg); + } } + } + } + + /** + * @param cfg Cache configuration. + * @throws IgniteCheckedException If failed. + */ + private void registerCache(CacheConfiguration<?, ?> cfg) throws IgniteCheckedException { + cloneCheckSerializable(cfg); + + CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg); + + // Initialize defaults. + initialize(cfg, cacheObjCtx); + + String masked = maskNull(cfg.getName()); + + if (registeredCaches.containsKey(masked)) { + String cacheName = cfg.getName(); + + if (cacheName != null) + throw new IgniteCheckedException("Duplicate cache name found (check configuration and " + + "assign unique name to each cache): " + U.maskName(cacheName)); + else + throw new IgniteCheckedException("Default cache has already been configured (check configuration and " + + "assign unique name to each cache)."); + } - CacheType cacheType; + CacheType cacheType; - if (CU.isUtilityCache(cfg.getName())) - cacheType = CacheType.UTILITY; - else if (CU.isMarshallerCache(cfg.getName())) - cacheType = CacheType.MARSHALLER; - else if (internalCaches.contains(maskNull(cfg.getName()))) - cacheType = CacheType.INTERNAL; - else - cacheType = CacheType.USER; + if (CU.isUtilityCache(cfg.getName())) + cacheType = CacheType.UTILITY; + else if (internalCaches.contains(maskNull(cfg.getName()))) + cacheType = CacheType.INTERNAL; + else + cacheType = CacheType.USER; - boolean template = cfg.getName() != null && cfg.getName().endsWith("*"); - - DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, cfg, cacheType, template, - IgniteUuid.randomUuid()); - - desc.locallyConfigured(true); - desc.staticallyConfigured(true); - desc.receivedFrom(ctx.localNodeId()); - - if (!template) { - registeredCaches.put(masked, desc); + boolean template = cfg.getName() != null && cfg.getName().endsWith("*"); - ctx.discovery().setCacheFilter( - cfg.getName(), - cfg.getNodeFilter(), - cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED, - cfg.getCacheMode()); + DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, + cfg, + cacheType, + template, + IgniteUuid.randomUuid()); - ctx.discovery().addClientNode(cfg.getName(), - ctx.localNodeId(), - cfg.getNearConfiguration() != null); + desc.locallyConfigured(true); + desc.staticallyConfigured(true); + desc.receivedFrom(ctx.localNodeId()); - if (!cacheType.userCache()) - stopSeq.addLast(cfg.getName()); - else - stopSeq.addFirst(cfg.getName()); - } - else { - if (log.isDebugEnabled()) - log.debug("Use cache configuration as template: " + cfg); + if (!template) { + registeredCaches.put(masked, desc); - registeredTemplates.put(masked, desc); - } + ctx.discovery().setCacheFilter( + cfg.getName(), + cfg.getNodeFilter(), + cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED, + cfg.getCacheMode()); - if (cfg.getName() == null) { // Use cache configuration with null name as template. - DynamicCacheDescriptor desc0 = - new DynamicCacheDescriptor(ctx, cfg, cacheType, true, IgniteUuid.randomUuid()); + ctx.discovery().addClientNode(cfg.getName(), + ctx.localNodeId(), + cfg.getNearConfiguration() != null); - desc0.locallyConfigured(true); - desc0.staticallyConfigured(true); + if (!cacheType.userCache()) + stopSeq.addLast(cfg.getName()); + else + stopSeq.addFirst(cfg.getName()); + } + else { + if (log.isDebugEnabled()) + log.debug("Use cache configuration as template: " + cfg); - registeredTemplates.put(masked, desc0); - } + registeredTemplates.put(masked, desc); } - // Start shared managers. - for (GridCacheSharedManager mgr : sharedCtx.managers()) - mgr.start(sharedCtx); + if (cfg.getName() == null) { // Use cache configuration with null name as template. + DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx, + cfg, + cacheType, + true, + IgniteUuid.randomUuid()); - transactions = new IgniteTransactionsImpl(sharedCtx); + desc0.locallyConfigured(true); + desc0.staticallyConfigured(true); - if (log.isDebugEnabled()) - log.debug("Started cache processor."); + registeredTemplates.put(masked, desc0); + } } /** @@@ -787,27 -795,11 +808,27 @@@ } } } + + if (!tmpCacheCfg.isEmpty()) { + CacheConfiguration[] newCacheCfg = new CacheConfiguration[tmpCacheCfg.size()]; + + tmpCacheCfg.toArray(newCacheCfg); + + ctx.config().setCacheConfiguration(newCacheCfg); + } + + activeOnStart = currStatus; } + if (activeOnStart && !ctx.clientNode() && !ctx.isDaemon()) + sharedCtx.database().lock(); + + // Must start database before start first cache. + sharedCtx.database().onKernalStart(false); + // Start dynamic caches received from collect discovery data. for (DynamicCacheDescriptor desc : registeredCaches.values()) { - if (ctx.config().isDaemon() && !CU.isMarshallerCache(desc.cacheConfiguration().getName())) + if (ctx.config().isDaemon()) continue; desc.clearRemoteConfigurations(); @@@ -2054,83 -1957,93 +2083,101 @@@ clientNodesMap = U.newHashMap(caches.size()); - for (GridCacheAdapter<?, ?> cache : caches.values()) { - DynamicCacheDescriptor desc = cachesOnDisconnect.get(maskNull(cache.name())); + collectDataOnReconnectingNode(reqs, clientNodesMap, joiningNodeId); + } + else { - reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size()); ++ reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size() + 1); - if (desc == null) - continue; + clientNodesMap = ctx.discovery().clientNodesMap(); - // RequestId must be null because on different node will be different byte [] and - // we get duplicate discovery data, for more details see - // TcpDiscoveryNodeAddedMessage#addDiscoveryData. - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest( - null, cache.name(), null); + collectDataOnGridNode(reqs); + } - req.startCacheConfiguration(desc.cacheConfiguration()); + DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs); - req.cacheType(desc.cacheType()); + batch.clientNodes(clientNodesMap); - req.deploymentId(desc.deploymentId()); + batch.clientReconnect(reconnect); - req.receivedFrom(desc.receivedFrom()); + // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same. + batch.id(null); - reqs.add(req); + return batch; + } - Boolean nearEnabled = cache.isNear(); + /** + * @param reqs requests. + */ + private void collectDataOnGridNode(Collection<DynamicCacheChangeRequest> reqs) { + for (DynamicCacheDescriptor desc : registeredCaches.values()) { - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null); ++ // RequestId must be null because on different node will be different byte [] and ++ // we get duplicate discovery data, for more details see ++ // TcpDiscoveryNodeAddedMessage#addDiscoveryData. ++ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, desc.cacheConfiguration().getName(), ++ null); - Map<UUID, Boolean> map = U.newHashMap(1); + req.startCacheConfiguration(desc.cacheConfiguration()); - map.put(nodeId, nearEnabled); + req.cacheType(desc.cacheType()); - clientNodesMap.put(cache.name(), map); - } - } - else { - reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size() + 1); + req.deploymentId(desc.deploymentId()); - for (DynamicCacheDescriptor desc : registeredCaches.values()) { - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest( - null, desc.cacheConfiguration().getName(), null); + req.receivedFrom(desc.receivedFrom()); - req.startCacheConfiguration(desc.cacheConfiguration()); + reqs.add(req); + } - req.cacheType(desc.cacheType()); + for (DynamicCacheDescriptor desc : registeredTemplates.values()) { - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null); ++ // RequestId must be null because on different node will be different byte [] and ++ // we get duplicate discovery data, for more details see ++ // TcpDiscoveryNodeAddedMessage#addDiscoveryData. ++ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, desc.cacheConfiguration().getName(), ++ null); - req.deploymentId(desc.deploymentId()); + req.startCacheConfiguration(desc.cacheConfiguration()); - req.receivedFrom(desc.receivedFrom()); + req.template(true); - reqs.add(req); - } + reqs.add(req); + } + } - for (DynamicCacheDescriptor desc : registeredTemplates.values()) { - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest( - null, desc.cacheConfiguration().getName(), null); + /** + * @param reqs requests. + * @param clientNodesMap Client nodes map. + * @param nodeId Node id. + */ + private void collectDataOnReconnectingNode( + Collection<DynamicCacheChangeRequest> reqs, + Map<String, Map<UUID, Boolean>> clientNodesMap, + UUID nodeId + ) { + for (GridCacheAdapter<?, ?> cache : caches.values()) { + DynamicCacheDescriptor desc = cachesOnDisconnect.get(maskNull(cache.name())); - req.startCacheConfiguration(desc.cacheConfiguration()); + if (desc == null) + continue; - req.template(true); - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cache.name(), null); ++ DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(null, cache.name(), null); - //todo check why id removed - /* req.deploymentId(desc.deploymentId());*/ + req.startCacheConfiguration(desc.cacheConfiguration()); - reqs.add(req); - } + req.cacheType(desc.cacheType()); - clientNodesMap = ctx.discovery().clientNodesMap(); - } + req.deploymentId(desc.deploymentId()); - DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs); + req.receivedFrom(desc.receivedFrom()); - batch.clientNodes(clientNodesMap); + reqs.add(req); - batch.clientReconnect(reconnect); + Boolean nearEnabled = cache.isNear(); - //todo check - // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same. - batch.id(null); + Map<UUID, Boolean> map = U.newHashMap(1); - return batch; + map.put(nodeId, nearEnabled); + + clientNodesMap.put(cache.name(), map); + } } /** {@inheritDoc} */ @@@ -2590,143 -2534,6 +2664,141 @@@ } /** + * Resets cache state after the cache has been moved to recovery state. + * + * @param cacheNames Cache names. + * @return Future that will be completed when state is changed for all caches. + */ + public IgniteInternalFuture<?> resetCacheState(Collection<String> cacheNames) { + checkEmptyTransactions(); + + if (F.isEmpty(cacheNames)) + cacheNames = registeredCaches.keySet(); + + Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheNames.size()); + + for (String cacheName : cacheNames) { + DynamicCacheDescriptor desc = registeredCaches.get(maskNull(cacheName)); + + if (desc == null) { + log.warning("Reset lost partition will not be executed, " + + "because cache with name:" + cacheName + " doesn't not exist"); + + continue; + } + + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest( + UUID.randomUUID(), cacheName, ctx.localNodeId()); + + req.markResetLostPartitions(); + + reqs.add(req); + } + + GridCompoundFuture fut = new GridCompoundFuture(); + + for (DynamicCacheStartFuture f : initiateCacheChanges(reqs, false)) + fut.add(f); + + fut.markInitialized(); + + return fut; + } + + /** + * + */ + public Collection<DynamicCacheChangeRequest> startAllCachesRequests() throws IgniteCheckedException { + List<DynamicCacheChangeRequest> reqs = new ArrayList<>(); + + if (!ctx.config().isDaemon() && + sharedCtx.pageStore() != null && + sharedCtx.database().persistenceEnabled()) { + Set<String> savedCacheNames = sharedCtx.pageStore().savedCacheNames(); + + for (String name : savedCacheNames) { + CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name); + + if (cfg != null) + reqs.add(createRequest(cfg, false)); + } + + for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) { + if (!savedCacheNames.contains(cfg.getName())) + reqs.add(createRequest(cfg, true)); + } + } + else { + for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) + reqs.add(createRequest(cfg, true)); + } + + return reqs; + } + + /** + * + */ + public Collection<DynamicCacheChangeRequest> stopAllCachesRequests(){ + List<DynamicCacheChangeRequest> reqs = new ArrayList<>(); + + for (String cacheName : cacheNames()) { + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest( + UUID.randomUUID(), cacheName, ctx.localNodeId()); + + DynamicCacheDescriptor desc = registeredCaches.get(cacheName); + + req.deploymentId(desc.deploymentId()); + req.stop(true); + req.destroy(false); + + reqs.add(req); + } + + return reqs; + } + + /** + * @param cfg Cache configuration. + */ + private DynamicCacheChangeRequest createRequest( + CacheConfiguration cfg, + boolean needInit + ) throws IgniteCheckedException { + assert cfg != null; + + cloneCheckSerializable(cfg); + + if (needInit){ + CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg); + + initialize(cfg, cacheObjCtx); + } + + String cacheName = cfg.getName(); + + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest( + UUID.randomUUID(), cacheName, ctx.localNodeId()); + + req.startCacheConfiguration(cfg); + + req.template(cfg.getName() != null && cfg.getName().endsWith("*")); + + req.nearCacheConfiguration(cfg.getNearConfiguration()); + + req.deploymentId(IgniteUuid.randomUuid()); + + if (CU.isUtilityCache(cacheName)) + req.cacheType(CacheType.UTILITY); - else if (CU.isMarshallerCache(cacheName)) - req.cacheType(CacheType.MARSHALLER); + else if (internalCaches.contains(cacheName)) + req.cacheType(CacheType.INTERNAL); + else + req.cacheType(CacheType.USER); + + return req; + } + + /** * @param reqs Requests. * @param failIfExists Fail if exists flag. * @return Collection of futures. http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 8c73026,787a767..eaa448f --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@@ -928,20 -939,7 +928,20 @@@ public class GridCacheProxyImpl<K, V> i } /** {@inheritDoc} */ + @Override public Iterator<Cache.Entry<K, V>> scanIterator(boolean keepBinary, + @Nullable IgniteBiPredicate<Object, Object> p) throws IgniteCheckedException { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.scanIterator(keepBinary, p); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ - @Override public IgniteInternalTx txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) { + @Override public GridNearTxLocal txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) { CacheOperationContext prev = gate.enter(opCtx); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index adf4e96,39a3baa..ad9eeb1 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@@ -39,10 -38,8 +39,11 @@@ import org.apache.ignite.internal.manag import org.apache.ignite.internal.managers.deployment.GridDeploymentManager; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; +import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.database.IgniteCacheDatabaseSharedManager; + import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java index 16d71ee,78610a7..614b3e3 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java @@@ -125,14 -90,10 +124,15 @@@ public class GridCacheTtlManager extend /** {@inheritDoc} */ @Override public void printMemoryStats() { - X.println(">>>"); - X.println(">>> TTL processor memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + + try { + X.println(">>>"); - X.println(">>> TTL processor memory stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']'); ++ X.println(">>> TTL processor memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + + ", cache=" + cctx.name() + ']'); - X.println(">>> pendingEntriesSize: " + pendingEntries.size()); + X.println(">>> pendingEntriesSize: " + pendingSize()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to print statistics: " + e, e); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/81ae2d83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ----------------------------------------------------------------------
