http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index d54e06f..5cbe377 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -121,11 +121,7 @@ public class IgnitionEx { }; /** */ - private static ThreadLocal<Boolean> clientMode = new ThreadLocal<Boolean>() { - @Override protected Boolean initialValue() { - return null; - } - }; + private static ThreadLocal<Boolean> clientMode = new ThreadLocal<>(); /** * Checks runtime version to be 1.7.x or 1.8.x. @@ -196,7 +192,7 @@ public class IgnitionEx { * @return Client mode flag. */ public static boolean isClientMode() { - return clientMode.get(); + return clientMode.get() == null ? false : clientMode.get(); } /** @@ -1458,8 +1454,9 @@ public class IgnitionEx { DFLT_PUBLIC_KEEP_ALIVE_TIME, new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP)); - // Pre-start all threads as they are guaranteed to be needed. - ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads(); + if (!myCfg.isClientMode()) + // Pre-start all threads as they are guaranteed to be needed. + ((ThreadPoolExecutor)execSvc).prestartAllCoreThreads(); // Note that since we use 'LinkedBlockingQueue', number of // maximum threads has no effect. @@ -1471,7 +1468,7 @@ public class IgnitionEx { new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP)); // Pre-start all threads as they are guaranteed to be needed. - ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads(); + ((ThreadPoolExecutor)sysExecSvc).prestartAllCoreThreads(); // Note that since we use 'LinkedBlockingQueue', number of // maximum threads has no effect. @@ -1764,20 +1761,14 @@ public class IgnitionEx { public void initializeDefaultCacheConfiguration(IgniteConfiguration cfg) throws IgniteCheckedException { List<CacheConfiguration> cacheCfgs = new ArrayList<>(); - boolean clientDisco = cfg.getDiscoverySpi() instanceof TcpClientDiscoverySpi; - - // Add marshaller and utility caches. - if (!clientDisco) { - cacheCfgs.add(marshallerSystemCache()); + cacheCfgs.add(marshallerSystemCache()); - cacheCfgs.add(utilitySystemCache()); - } + cacheCfgs.add(utilitySystemCache()); if (IgniteComponentType.HADOOP.inClassPath()) cacheCfgs.add(CU.hadoopSystemCache()); - if (cfg.getAtomicConfiguration() != null && !clientDisco) - cacheCfgs.add(atomicsSystemCache(cfg.getAtomicConfiguration())); + cacheCfgs.add(atomicsSystemCache(cfg.getAtomicConfiguration())); CacheConfiguration[] userCaches = cfg.getCacheConfiguration(); @@ -1854,7 +1845,7 @@ public class IgnitionEx { if (cfg.getSwapSpaceSpi() == null) { boolean needSwap = false; - if (cfg.getCacheConfiguration() != null) { + if (cfg.getCacheConfiguration() != null && !Boolean.TRUE.equals(cfg.isClientMode())) { for (CacheConfiguration c : cfg.getCacheConfiguration()) { if (c.isSwapEnabled()) { needSwap = true; @@ -2005,7 +1996,6 @@ public class IgnitionEx { ccfg.setWriteSynchronizationMode(FULL_SYNC); ccfg.setCacheMode(cfg.getCacheMode()); ccfg.setNodeFilter(CacheConfiguration.ALL_NODES); - ccfg.setNearConfiguration(new NearCacheConfiguration()); if (cfg.getCacheMode() == PARTITIONED) ccfg.setBackups(cfg.getBackups());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java index 5dca2f2..21f2264 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextAdapter.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.marshaller.*; import org.jsr166.*; @@ -49,10 +50,29 @@ public abstract class MarshallerContextAdapter implements MarshallerContext { Enumeration<URL> urls = ldr.getResources(CLS_NAMES_FILE); - while (urls.hasMoreElements()) + boolean foundClsNames = false; + + while (urls.hasMoreElements()) { processResource(urls.nextElement()); - processResource(ldr.getResource(JDK_CLS_NAMES_FILE)); + foundClsNames = true; + } + + if (!foundClsNames) + throw new IgniteException("Failed to load class names properties file packaged with ignite binaries " + + "[file=" + CLS_NAMES_FILE + ", ldr=" + ldr + ']'); + + URL jdkClsNames = ldr.getResource(JDK_CLS_NAMES_FILE); + + if (jdkClsNames == null) + throw new IgniteException("Failed to load class names properties file packaged with ignite binaries " + + "[file=" + JDK_CLS_NAMES_FILE + ", ldr=" + ldr + ']'); + + processResource(jdkClsNames); + + checkHasClassName(GridDhtPartitionFullMap.class.getName(), ldr, CLS_NAMES_FILE); + checkHasClassName(GridDhtPartitionMap.class.getName(), ldr, CLS_NAMES_FILE); + checkHasClassName(HashMap.class.getName(), ldr, JDK_CLS_NAMES_FILE); } catch (IOException e) { throw new IllegalStateException("Failed to initialize marshaller context.", e); @@ -60,6 +80,18 @@ public abstract class MarshallerContextAdapter implements MarshallerContext { } /** + * @param clsName Class name. + * @param ldr Class loader used to get properties file. + * @param fileName File name. + */ + private void checkHasClassName(String clsName, ClassLoader ldr, String fileName) { + if (!map.containsKey(clsName.hashCode())) + throw new IgniteException("Failed to read class name from class names properties file. " + + "Make sure class names properties file packaged with ignite binaries is not corrupted " + + "[clsName=" + clsName + ", fileName=" + fileName + ", ldr=" + ldr + ']'); + } + + /** * @param url Resource URL. * @throws IOException In case of error. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index 85939a6..e614408 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -59,7 +59,7 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { ctx.cache().marshallerCache().context().continuousQueries().executeInternalQuery( new ContinuousQueryListener(log, workDir), null, - true, + ctx.cache().marshallerCache().context().affinityNode(), true ); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java index ee32692..779b54d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java @@ -22,19 +22,17 @@ import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.util.typedef.internal.*; -import java.io.*; - /** * Custom event. */ public class DiscoveryCustomEvent extends DiscoveryEvent { /** */ private static final long serialVersionUID = 0L; - + /** * Built-in event type: custom event sent. * <br> - * Generated when someone invoke {@link GridDiscoveryManager#sendCustomEvent(Serializable)}. + * Generated when someone invoke {@link GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage)}. * <p> * * @see DiscoveryCustomEvent @@ -42,7 +40,7 @@ public class DiscoveryCustomEvent extends DiscoveryEvent { public static final int EVT_DISCOVERY_CUSTOM_EVT = 18; /** */ - private Serializable data; + private DiscoveryCustomMessage customMsg; /** Affinity topology version. */ private AffinityTopologyVersion affTopVer; @@ -57,15 +55,15 @@ public class DiscoveryCustomEvent extends DiscoveryEvent { /** * @return Data. */ - public Serializable data() { - return data; + public DiscoveryCustomMessage customMessage() { + return customMsg; } /** - * @param data New data. + * @param customMsg New customMessage. */ - public void data(Serializable data) { - this.data = data; + public void customMessage(DiscoveryCustomMessage customMsg) { + this.customMsg = customMsg; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java index 11af716..6a6f22a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java @@ -73,6 +73,7 @@ public class IgfsMarshaller { } /** + * Serializes the message and sends it into the given output stream. * @param msg Message. * @param hdr Message header. * @param out Output. @@ -119,6 +120,7 @@ public class IgfsMarshaller { IgfsPathControlRequest req = (IgfsPathControlRequest)msg; + U.writeString(out, req.userName()); writePath(out, req.path()); writePath(out, req.destinationPath()); out.writeBoolean(req.flag()); @@ -236,6 +238,7 @@ public class IgfsMarshaller { case OPEN_CREATE: { IgfsPathControlRequest req = new IgfsPathControlRequest(); + req.userName(U.readString(in)); req.path(readPath(in)); req.destinationPath(readPath(in)); req.flag(in.readBoolean()); @@ -298,8 +301,6 @@ public class IgfsMarshaller { } } - assert msg != null; - msg.command(cmd); return msg; @@ -341,34 +342,4 @@ public class IgfsMarshaller { return null; } - - /** - * Writes string to output. - * - * @param out Data output. - * @param str String. - * @throws IOException If write failed. - */ - private void writeString(DataOutput out, @Nullable String str) throws IOException { - out.writeBoolean(str != null); - - if (str != null) - out.writeUTF(str); - } - - /** - * Reads string from input. - * - * @param in Data input. - * @return Read string. - * @throws IOException If read failed. - */ - @Nullable private String readString(DataInput in) throws IOException { - boolean hasStr = in.readBoolean(); - - if (hasStr) - return in.readUTF(); - - return null; - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java index 7ed1619..2f6e6e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.igfs.common; import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -63,6 +64,9 @@ public class IgfsPathControlRequest extends IgfsMessage { /** Last modification time. */ private long modificationTime; + /** The user name this control request is made on behalf of. */ + private String userName; + /** * @param path Path. */ @@ -235,4 +239,22 @@ public class IgfsPathControlRequest extends IgfsMessage { @Override public String toString() { return S.toString(IgfsPathControlRequest.class, this, "cmd", command()); } + + /** + * Getter for the user name. + * @return user name. + */ + public final String userName() { + assert userName != null; + + return userName; + } + + /** + * Setter for the user name. + * @param userName the user name. + */ + public final void userName(String userName) { + this.userName = IgfsUtils.fixUserName(userName); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index c93c059..bea4256 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -23,7 +23,7 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; -import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -31,7 +31,7 @@ import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.plugin.security.*; import org.apache.ignite.spi.*; -import org.apache.ignite.spi.swapspace.*; + import org.jetbrains.annotations.*; import javax.cache.expiry.*; @@ -439,46 +439,10 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan return ctx.cache().cache(cacheName).containsKey(key); } - @Override public void writeToSwap(String spaceName, Object key, @Nullable Object val, - @Nullable ClassLoader ldr) { - assert ctx.swap().enabled(); - - try { - ctx.swap().write(spaceName, key, val, ldr); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - @SuppressWarnings({"unchecked"}) - @Nullable @Override public <T> T readFromSwap(String spaceName, SwapKey key, - @Nullable ClassLoader ldr) { - try { - assert ctx.swap().enabled(); - - return ctx.swap().readValue(spaceName, key, ldr); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - @Override public int partition(String cacheName, Object key) { return ctx.cache().cache(cacheName).affinity().partition(key); } - @Override public void removeFromSwap(String spaceName, Object key, @Nullable ClassLoader ldr) { - try { - assert ctx.swap().enabled(); - - ctx.swap().remove(spaceName, key, null, ldr); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - @Override public IgniteNodeValidationResult validateNode(ClusterNode node) { for (GridComponent comp : ctx) { IgniteNodeValidationResult err = comp.validateNode(node); @@ -508,26 +472,6 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan } } - @SuppressWarnings("unchecked") - @Nullable @Override public <V> V readValueFromOffheapAndSwap(@Nullable String spaceName, - Object key, @Nullable ClassLoader ldr) { - try { - IgniteInternalCache<Object, V> cache = ctx.cache().cache(spaceName); - - GridCacheContext cctx = cache.context(); - - if (cctx.isNear()) - cctx = cctx.near().dht().context(); - - GridCacheSwapEntry e = cctx.swap().read(cctx.toCacheKeyObject(key), true, true); - - return e != null ? CU.<V>value(e.value(), cctx, true) : null; - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - @Override public MessageFormatter messageFormatter() { return ctx.io().formatter(); } @@ -540,6 +484,14 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan return ctx.discovery().tryFailNode(nodeId); } + @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) { + ctx.timeout().addTimeoutObject(new GridSpiTimeoutObject(obj)); + } + + @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) { + ctx.timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj)); + } + /** * @param e Exception to handle. * @return GridSpiException Converted exception. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java index 2e80b6f..ce2a36c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java @@ -56,11 +56,10 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { private final GridMessageListener lsnr = new CheckpointRequestListener(); /** */ - private final ConcurrentMap<IgniteUuid, CheckpointSet> keyMap = new ConcurrentHashMap8<>(); + private final ConcurrentMap<IgniteUuid, CheckpointSet> keyMap; /** */ - private final Collection<IgniteUuid> closedSess = new GridBoundedConcurrentLinkedHashSet<>( - MAX_CLOSED_SESS, MAX_CLOSED_SESS, 0.75f, 256, PER_SEGMENT_Q); + private final Collection<IgniteUuid> closedSess; /** Grid marshaller. */ private final Marshaller marsh; @@ -72,6 +71,21 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { super(ctx, ctx.config().getCheckpointSpi()); marsh = ctx.config().getMarshaller(); + + if (enabled()) { + keyMap = new ConcurrentHashMap8<>(); + + closedSess = new GridBoundedConcurrentLinkedHashSet<>(MAX_CLOSED_SESS, + MAX_CLOSED_SESS, + 0.75f, + 256, + PER_SEGMENT_Q); + } + else { + keyMap = null; + + closedSess = null; + } } /** {@inheritDoc} */ @@ -112,7 +126,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { * @return Session IDs. */ public Collection<IgniteUuid> sessionIds() { - return new ArrayList<>(keyMap.keySet()); + return enabled() ? new ArrayList<>(keyMap.keySet()) : Collections.<IgniteUuid>emptyList(); } /** @@ -125,8 +139,17 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { * @return {@code true} if checkpoint has been actually saved, {@code false} otherwise. * @throws IgniteCheckedException Thrown in case of any errors. */ - public boolean storeCheckpoint(GridTaskSessionInternal ses, String key, Object state, ComputeTaskSessionScope scope, - long timeout, boolean override) throws IgniteCheckedException { + public boolean storeCheckpoint(GridTaskSessionInternal ses, + String key, + Object state, + ComputeTaskSessionScope scope, + long timeout, + boolean override) + throws IgniteCheckedException + { + if (!enabled()) + return false; + assert ses != null; assert key != null; @@ -239,6 +262,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { * @return Whether or not checkpoint was removed. */ public boolean removeCheckpoint(String key) { + if (!enabled()) + return false; + assert key != null; boolean rmv = false; @@ -256,6 +282,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { * @return Whether or not checkpoint was removed. */ public boolean removeCheckpoint(GridTaskSessionInternal ses, String key) { + if (!enabled()) + return false; + assert ses != null; assert key != null; @@ -283,6 +312,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { * @throws IgniteCheckedException Thrown in case of any errors. */ @Nullable public Serializable loadCheckpoint(GridTaskSessionInternal ses, String key) throws IgniteCheckedException { + if (!enabled()) + return null; + assert ses != null; assert key != null; @@ -309,6 +341,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { * @param cleanup Whether cleanup or not. */ public void onSessionEnd(GridTaskSessionInternal ses, boolean cleanup) { + if (!enabled()) + return; + closedSess.add(ses.getId()); // If on task node. @@ -358,7 +393,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { @Override public void printMemoryStats() { X.println(">>>"); X.println(">>> Checkpoint manager memory stats [grid=" + ctx.gridName() + ']'); - X.println(">>> keyMap: " + keyMap.size()); + X.println(">>> keyMap: " + (keyMap != null ? keyMap.size() : 0)); } /** @@ -407,6 +442,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> { if (log.isDebugEnabled()) log.debug("Received checkpoint request: " + req); + if (!enabled()) + return; + IgniteUuid sesId = req.getSessionId(); if (closedSess.contains(sesId)) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index c877d57..4382731 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1211,6 +1211,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa public void addUserMessageListener(@Nullable final Object topic, @Nullable final IgniteBiPredicate<UUID, ?> p) { if (p != null) { try { + if (p instanceof GridLifecycleAwareMessageFilter) + ((GridLifecycleAwareMessageFilter)p).initialize(ctx); + else + ctx.resource().injectGeneric(p); + addMessageListener(TOPIC_COMM_USER, new GridUserMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p)); } @@ -1695,13 +1700,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa throws IgniteCheckedException { this.topic = topic; this.predLsnr = predLsnr; - - if (predLsnr != null) { - if (predLsnr instanceof GridLifecycleAwareMessageFilter) - ((GridLifecycleAwareMessageFilter)predLsnr).initialize(ctx); - else - ctx.resource().injectGeneric(predLsnr); - } } /** {@inheritDoc} */ @@ -1724,69 +1722,84 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return; } - Object msgBody = ioMsg.body(); - - assert msgBody != null || ioMsg.bodyBytes() != null; + busyLock.readLock(); try { - byte[] msgTopicBytes = ioMsg.topicBytes(); - - Object msgTopic = ioMsg.topic(); - - GridDeployment dep = ioMsg.deployment(); - - if (dep == null && ctx.config().isPeerClassLoadingEnabled() && - ioMsg.deploymentClassName() != null) { - dep = ctx.deploy().getGlobalDeployment( - ioMsg.deploymentMode(), - ioMsg.deploymentClassName(), - ioMsg.deploymentClassName(), - ioMsg.userVersion(), - nodeId, - ioMsg.classLoaderId(), - ioMsg.loaderParticipants(), - null); - - if (dep == null) - throw new IgniteDeploymentCheckedException( - "Failed to obtain deployment information for user message. " + - "If you are using custom message or topic class, try implementing " + - "GridPeerDeployAware interface. [msg=" + ioMsg + ']'); - - ioMsg.deployment(dep); // Cache deployment. + if (stopping) { + if (log.isDebugEnabled()) + log.debug("Received user message while stopping (will ignore) [nodeId=" + + nodeId + ", msg=" + msg + ']'); + + return; } - // Unmarshall message topic if needed. - if (msgTopic == null && msgTopicBytes != null) { - msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null); + Object msgBody = ioMsg.body(); - ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings. - } + assert msgBody != null || ioMsg.bodyBytes() != null; - if (!F.eq(topic, msgTopic)) - return; + try { + byte[] msgTopicBytes = ioMsg.topicBytes(); + + Object msgTopic = ioMsg.topic(); + + GridDeployment dep = ioMsg.deployment(); + + if (dep == null && ctx.config().isPeerClassLoadingEnabled() && + ioMsg.deploymentClassName() != null) { + dep = ctx.deploy().getGlobalDeployment( + ioMsg.deploymentMode(), + ioMsg.deploymentClassName(), + ioMsg.deploymentClassName(), + ioMsg.userVersion(), + nodeId, + ioMsg.classLoaderId(), + ioMsg.loaderParticipants(), + null); + + if (dep == null) + throw new IgniteDeploymentCheckedException( + "Failed to obtain deployment information for user message. " + + "If you are using custom message or topic class, try implementing " + + "GridPeerDeployAware interface. [msg=" + ioMsg + ']'); + + ioMsg.deployment(dep); // Cache deployment. + } - if (msgBody == null) { - msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null); + // Unmarshall message topic if needed. + if (msgTopic == null && msgTopicBytes != null) { + msgTopic = marsh.unmarshal(msgTopicBytes, dep != null ? dep.classLoader() : null); - ioMsg.body(msgBody); // Save body to avoid future unmarshallings. - } + ioMsg.topic(msgTopic); // Save topic to avoid future unmarshallings. + } - // Resource injection. - if (dep != null) - ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" + - msg + ']', e); - } + if (!F.eq(topic, msgTopic)) + return; + + if (msgBody == null) { + msgBody = marsh.unmarshal(ioMsg.bodyBytes(), dep != null ? dep.classLoader() : null); - if (msgBody != null) { - if (predLsnr != null) { - if (!predLsnr.apply(nodeId, msgBody)) - removeMessageListener(TOPIC_COMM_USER, this); + ioMsg.body(msgBody); // Save body to avoid future unmarshallings. + } + + // Resource injection. + if (dep != null) + ctx.resource().inject(dep, dep.deployedClass(ioMsg.deploymentClassName()), msgBody); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal user message [node=" + nodeId + ", message=" + + msg + ']', e); + } + + if (msgBody != null) { + if (predLsnr != null) { + if (!predLsnr.apply(nodeId, msgBody)) + removeMessageListener(TOPIC_COMM_USER, this); + } } } + finally { + busyLock.readUnlock(); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java new file mode 100644 index 0000000..2005d4e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import org.apache.ignite.cluster.*; + +/** + * Listener interface. + */ +public interface CustomEventListener<T extends DiscoveryCustomMessage> { + /** + * @param snd Sender. + * @param msg Message. + */ + public void onCustomEvent(ClusterNode snd, T msg); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java new file mode 100644 index 0000000..23f8bda --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import org.apache.ignite.spi.discovery.*; +import org.jetbrains.annotations.*; + +/** + * + */ +class CustomMessageWrapper implements DiscoverySpiCustomMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final DiscoveryCustomMessage delegate; + + /** + * @param delegate Delegate. + */ + CustomMessageWrapper(DiscoveryCustomMessage delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { + DiscoveryCustomMessage res = delegate.ackMessage(); + + return res == null ? null : new CustomMessageWrapper(res); + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return delegate.isMutable(); + } + + /** + * @return Delegate. + */ + public DiscoveryCustomMessage delegate() { + return delegate; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return delegate.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java new file mode 100644 index 0000000..401486d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * + */ +public interface DiscoveryCustomMessage extends Serializable { + /** + * @return Unique custom message ID. + */ + public IgniteUuid id(); + + /** + * Whether or not minor version of topology should be increased on message receive. + * + * @return {@code true} if minor topology version should be increased. + * @see AffinityTopologyVersion#minorTopVer + */ + public boolean incrementMinorTopologyVersion(); + + /** + * Called when custom message has been handled by all nodes. + * + * @return Ack message or {@code null} if ack is not required. + */ + @Nullable public DiscoveryCustomMessage ackMessage(); + + /** + * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes. + */ + public boolean isMutable(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 0950774..71fbc61 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.jobmetrics.*; import org.apache.ignite.internal.processors.security.*; +import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; @@ -165,10 +166,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private final GridLocalMetrics metrics = createMetrics(); /** Metrics update worker. */ - private final MetricsUpdater metricsUpdater = new MetricsUpdater(); + private GridTimeoutProcessor.CancelableTask metricsUpdateTask; /** Custom event listener. */ - private GridPlainInClosure<Serializable> customEvtLsnr; + private ConcurrentMap<Class<?>, List<CustomEventListener<DiscoveryCustomMessage>>> customEvtLsnrs = + new ConcurrentHashMap8<>(); /** Map of dynamic cache filters. */ private Map<String, CachePredicate> registeredCaches = new HashMap<>(); @@ -176,6 +178,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** */ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + /** Received custom messages history. */ + private final ArrayDeque<IgniteUuid> rcvdCustomMsgs = new ArrayDeque<>(); + /** @param ctx Context. */ public GridDiscoveryManager(GridKernalContext ctx) { super(ctx, ctx.config().getDiscoverySpi()); @@ -214,6 +219,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * * @param cacheName Cache name. * @param filter Cache filter. + * @param nearEnabled Near enabled flag. * @param loc {@code True} if cache is local. */ public void setCacheFilter( @@ -240,12 +246,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * * @param cacheName Cache name. * @param clientNodeId Near node ID. + * @param nearEnabled Near enabled flag. */ public void addClientNode(String cacheName, UUID clientNodeId, boolean nearEnabled) { - CachePredicate predicate = registeredCaches.get(cacheName); + CachePredicate pred = registeredCaches.get(cacheName); - if (predicate != null) - predicate.addClientNode(clientNodeId, nearEnabled); + if (pred != null) + pred.addClientNode(clientNodeId, nearEnabled); } /** @@ -279,17 +286,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } - /** - * @param evtType Event type. - * @return Next affinity topology version. - */ - private AffinityTopologyVersion nextTopologyVersion(int evtType, long topVer) { - if (evtType == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) - minorTopVer++; - else if (evtType != EVT_NODE_METRICS_UPDATED) - minorTopVer = 0; - - return new AffinityTopologyVersion(topVer, minorTopVer); + /** {@inheritDoc} */ + @Override protected void onKernalStart0() throws IgniteCheckedException { + if (Boolean.TRUE.equals(ctx.config().isClientMode()) && !getSpi().isClientMode()) + ctx.performance().add("Enable client mode for TcpDiscoverySpi " + + "(set TcpDiscoverySpi.forceServerMode to false)"); } /** {@inheritDoc} */ @@ -328,7 +329,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { checkSegmentOnStart(); } - new IgniteThread(metricsUpdater).start(); + metricsUpdateTask = ctx.timeout().schedule(new MetricsUpdater(), METRICS_UPDATE_FREQ, METRICS_UPDATE_FREQ); spi.setMetricsProvider(createMetricsProvider()); @@ -356,14 +357,41 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ClusterNode node, Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> snapshots, - @Nullable Serializable data + @Nullable DiscoverySpiCustomMessage spiCustomMsg ) { + DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null + : ((CustomMessageWrapper)spiCustomMsg).delegate(); + + if (skipMessage(type, customMsg)) + return; + final ClusterNode locNode = localNode(); if (snapshots != null) topHist = snapshots; - AffinityTopologyVersion nextTopVer = nextTopologyVersion(type, topVer); + boolean verChanged; + + if (type == EVT_NODE_METRICS_UPDATED) + verChanged = false; + else if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { + assert customMsg != null; + + if (customMsg.incrementMinorTopologyVersion()) { + minorTopVer++; + + verChanged = true; + } + else + verChanged = false; + } + else { + minorTopVer = 0; + + verChanged = true; + } + + AffinityTopologyVersion nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer); if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT) { for (DiscoCache c : discoCacheHist.values()) @@ -373,19 +401,26 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { - try { - if (customEvtLsnr != null) - customEvtLsnr.apply(data); - } - catch (Exception e) { - U.error(log, "Failed to notify direct custom event listener: " + data, e); + for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) { + List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls); + + if (list != null) { + for (CustomEventListener<DiscoveryCustomMessage> lsnr : list) { + try { + lsnr.onCustomEvent(node, customMsg); + } + catch (Exception e) { + U.error(log, "Failed to notify direct custom event listener: " + customMsg, e); + } + } + } } } // Put topology snapshot into discovery history. // There is no race possible between history maintenance and concurrent discovery // event notifications, since SPI notifies manager about all events from this listener. - if (type != EVT_NODE_METRICS_UPDATED) { + if (verChanged) { DiscoCache cache = new DiscoCache(locNode, F.view(topSnapshot, F.remoteNodes(locNode.id()))); discoCacheHist.put(nextTopVer, cache); @@ -417,7 +452,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { return; } - discoWrk.addEvent(type, nextTopVer, node, topSnapshot, data); + discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg); } }); @@ -486,10 +521,43 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * @param customEvtLsnr Custom event listener. + * @param type Message type. + * @param customMsg Custom message. + * @return {@code True} if should not process message. */ - public void setCustomEventListener(GridPlainInClosure<Serializable> customEvtLsnr) { - this.customEvtLsnr = customEvtLsnr; + private boolean skipMessage(int type, @Nullable DiscoveryCustomMessage customMsg) { + if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { + assert customMsg != null && customMsg.id() != null : customMsg; + + if (rcvdCustomMsgs.contains(customMsg.id())) { + if (log.isDebugEnabled()) + log.debug("Received duplicated custom message, will ignore [msg=" + customMsg + "]"); + + return true; + } + + rcvdCustomMsgs.addLast(customMsg.id()); + + while (rcvdCustomMsgs.size() > DISCOVERY_HISTORY_SIZE) + rcvdCustomMsgs.pollFirst(); + } + + return false; + } + + /** + * @param msgCls Message class. + * @param lsnr Custom event listener. + */ + public <T extends DiscoveryCustomMessage> void setCustomEventListener(Class<T> msgCls, CustomEventListener<T> lsnr) { + List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(msgCls); + + if (list == null) { + list = F.addIfAbsent(customEvtLsnrs, msgCls, + new CopyOnWriteArrayList<CustomEventListener<DiscoveryCustomMessage>>()); + } + + list.add((CustomEventListener<DiscoveryCustomMessage>)lsnr); } /** @@ -660,7 +728,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Map<Integer, CacheMetrics> metrics = null; for (GridCacheAdapter<?, ?> cache : caches) { - if (cache.context().started() && cache.configuration().isStatisticsEnabled()) { + if (cache.configuration().isStatisticsEnabled() && + cache.context().started() && + cache.context().affinity().affinityTopologyVersion().topologyVersion() > 0) { if (metrics == null) metrics = U.newHashMap(caches.size()); @@ -952,11 +1022,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { getSpi().setListener(null); // Stop discovery worker and metrics updater. + U.closeQuiet(metricsUpdateTask); + U.cancel(discoWrk); - U.cancel(metricsUpdater); U.join(discoWrk, log); - U.join(metricsUpdater, log); // Stop SPI itself. stopSpi(); @@ -1218,13 +1288,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Gets alive remote nodes with at least one cache configured. + * Gets alive remote server nodes with at least one cache configured. * * @param topVer Topology version (maximum allowed node order). * @return Collection of alive cache nodes. */ - public Collection<ClusterNode> aliveRemoteNodesWithCaches(AffinityTopologyVersion topVer) { - return resolveDiscoCache(null, topVer).aliveRemoteNodesWithCaches(topVer.topologyVersion()); + public Collection<ClusterNode> aliveRemoteServerNodesWithCaches(AffinityTopologyVersion topVer) { + return resolveDiscoCache(null, topVer).aliveRemoteServerNodesWithCaches(topVer.topologyVersion()); + } + + /** + * Gets alive server nodes with at least one cache configured. + * + * @param topVer Topology version (maximum allowed node order). + * @return Collection of alive cache nodes. + */ + public Collection<ClusterNode> aliveServerNodesWithCaches(AffinityTopologyVersion topVer) { + return resolveDiscoCache(null, topVer).aliveServerNodesWithCaches(topVer.topologyVersion()); } /** @@ -1256,9 +1336,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if node is a cache data node. */ public boolean cacheAffinityNode(ClusterNode node, String cacheName) { - CachePredicate predicate = registeredCaches.get(cacheName); + CachePredicate pred = registeredCaches.get(cacheName); - return predicate != null && predicate.dataNode(node); + return pred != null && pred.dataNode(node); } /** @@ -1267,9 +1347,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if node has near cache enabled. */ public boolean cacheNearNode(ClusterNode node, String cacheName) { - CachePredicate predicate = registeredCaches.get(cacheName); + CachePredicate pred = registeredCaches.get(cacheName); - return predicate != null && predicate.nearNode(node); + return pred != null && pred.nearNode(node); } /** @@ -1278,9 +1358,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if node has client cache (without near cache). */ public boolean cacheClientNode(ClusterNode node, String cacheName) { - CachePredicate predicate = registeredCaches.get(cacheName); + CachePredicate pred = registeredCaches.get(cacheName); - return predicate != null && predicate.clientNode(node); + return pred != null && pred.clientNode(node); } /** @@ -1289,9 +1369,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return If cache with the given name is accessible on the given node. */ public boolean cacheNode(ClusterNode node, String cacheName) { - CachePredicate predicate = registeredCaches.get(cacheName); + CachePredicate pred = registeredCaches.get(cacheName); - return predicate != null && predicate.cacheNode(node); + return pred != null && pred.cacheNode(node); } /** @@ -1384,10 +1464,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * @param evt Event. + * @param msg Custom message. */ - public void sendCustomEvent(Serializable evt) { - getSpi().sendCustomEvent(evt); + public void sendCustomEvent(DiscoveryCustomMessage msg) { + getSpi().sendCustomEvent(new CustomMessageWrapper(msg)); } /** @@ -1542,8 +1622,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** Worker for discovery events. */ private class DiscoveryWorker extends GridWorker { /** Event queue. */ - private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, Serializable>> evts = - new LinkedBlockingQueue<>(); + private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, + DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>(); /** Node segmented event fired flag. */ private boolean nodeSegFired; @@ -1609,9 +1689,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { AffinityTopologyVersion topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, - @Nullable Serializable data + @Nullable DiscoveryCustomMessage data ) { - assert node != null; + assert node != null : data; evts.add(F.t(type, topVer, node, topSnapshot, data)); } @@ -1650,7 +1730,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** @throws InterruptedException If interrupted. */ @SuppressWarnings("DuplicateCondition") private void body0() throws InterruptedException { - GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, Serializable> evt = evts.take(); + GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, + DiscoveryCustomMessage> evt = evts.take(); int type = evt.get1(); @@ -1768,7 +1849,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { customEvt.type(type); customEvt.topologySnapshot(topVer.topologyVersion(), null); customEvt.affinityTopologyVersion(topVer); - customEvt.data(evt.get5()); + customEvt.customMessage(evt.get5()); ctx.event().record(customEvt); } @@ -1833,28 +1914,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * */ - private class MetricsUpdater extends GridWorker { + private class MetricsUpdater implements Runnable { /** */ private long prevGcTime = -1; /** */ private long prevCpuTime = -1; - /** - * - */ - private MetricsUpdater() { - super(ctx.gridName(), "metrics-updater", GridDiscoveryManager.this.log); - } - /** {@inheritDoc} */ - @Override protected void body() throws IgniteInterruptedCheckedException { - while (!isCancelled()) { - U.sleep(METRICS_UPDATE_FREQ); - - gcCpuLoad = getGcCpuLoad(); - cpuLoad = getCpuLoad(); - } + @Override public void run() { + gcCpuLoad = getGcCpuLoad(); + cpuLoad = getCpuLoad(); } /** @@ -2065,9 +2135,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private final Collection<ClusterNode> aliveNodesWithCaches; /** - * Cached alive remote nodes with caches. + * Cached alive server remote nodes with caches. + */ + private final Collection<ClusterNode> aliveSrvNodesWithCaches; + + /** + * Cached alive remote server nodes with caches. */ - private final Collection<ClusterNode> aliveRmtNodesWithCaches; + private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches; /** * @param loc Local node. @@ -2088,21 +2163,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { all.addAll(rmtNodes); + Collections.sort(all, GridNodeOrderComparator.INSTANCE); + allNodes = Collections.unmodifiableList(all); - Map<String, Collection<ClusterNode>> cacheMap = - new HashMap<>(allNodes.size(), 1.0f); - Map<String, Collection<ClusterNode>> rmtCacheMap = - new HashMap<>(allNodes.size(), 1.0f); - Map<String, Collection<ClusterNode>> dhtNodesMap = - new HashMap<>(allNodes.size(), 1.0f); + Map<String, Collection<ClusterNode>> cacheMap = new HashMap<>(allNodes.size(), 1.0f); + Map<String, Collection<ClusterNode>> rmtCacheMap = new HashMap<>(allNodes.size(), 1.0f); + Map<String, Collection<ClusterNode>> dhtNodesMap =new HashMap<>(allNodes.size(), 1.0f); Collection<ClusterNode> nodesWithCaches = new HashSet<>(allNodes.size()); Collection<ClusterNode> rmtNodesWithCaches = new HashSet<>(allNodes.size()); aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); aliveNodesWithCaches = new ConcurrentSkipListSet<>(); - aliveRmtNodesWithCaches = new ConcurrentSkipListSet<>(); + aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>(); + aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>(); nodesByVer = new TreeMap<>(); long maxOrder0 = 0; @@ -2154,8 +2229,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (alive(node.id())) { aliveNodesWithCaches.add(node); - if (!loc.id().equals(node.id())) - aliveRmtNodesWithCaches.add(node); + if (!CU.clientNode(node)) { + aliveSrvNodesWithCaches.add(node); + + if (!loc.id().equals(node.id())) + aliveRmtSrvNodesWithCaches.add(node); + } } } @@ -2240,13 +2319,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * @return All nodes with at least one cache configured. - */ - Collection<ClusterNode> allNodesWithCaches() { - return allNodesWithCaches; - } - - /** * Gets collection of nodes which have version equal or greater than {@code ver}. * * @param ver Version to check. @@ -2345,13 +2417,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * Gets all alive remote nodes with at least one cache configured. + * Gets all alive remote server nodes with at least one cache configured. * * @param topVer Topology version. * @return Collection of nodes. */ - Collection<ClusterNode> aliveRemoteNodesWithCaches(final long topVer) { - return filter(topVer, aliveRmtNodesWithCaches); + Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) { + return filter(topVer, aliveRmtSrvNodesWithCaches); + } + + /** + * Gets all alive server nodes with at least one cache configured. + * + * @param topVer Topology version. + * @return Collection of nodes. + */ + Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) { + return filter(topVer, aliveSrvNodesWithCaches); } /** @@ -2388,7 +2470,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { filterNodeMap(aliveRmtCacheNodes, leftNode); aliveNodesWithCaches.remove(leftNode); - aliveRmtNodesWithCaches.remove(leftNode); + aliveSrvNodesWithCaches.remove(leftNode); + aliveRmtSrvNodesWithCaches.remove(leftNode); } /** @@ -2480,11 +2563,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private boolean loc; /** Collection of client near nodes. */ - private Map<UUID, Boolean> clientNodes; + private ConcurrentHashMap<UUID, Boolean> clientNodes; /** * @param cacheFilter Cache filter. * @param nearEnabled Near enabled flag. + * @param loc {@code True} if cache is local. */ private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, boolean loc) { assert cacheFilter != null; @@ -2498,9 +2582,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * @param nodeId Near node ID to add. + * @param nearEnabled Near enabled flag. */ public void addClientNode(UUID nodeId, boolean nearEnabled) { - clientNodes.put(nodeId, nearEnabled); + clientNodes.putIfAbsent(nodeId, nearEnabled); } /** @@ -2515,7 +2600,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if this node is a data node for given cache. */ public boolean dataNode(ClusterNode node) { - return !node.isDaemon() && cacheFilter.apply(node); + return !node.isDaemon() && CU.affinityNode(node, cacheFilter); } /** @@ -2523,8 +2608,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if cache is accessible on the given node. */ public boolean cacheNode(ClusterNode node) { - return !node.isClient() && !node.isDaemon() && - (cacheFilter.apply(node) || clientNodes.containsKey(node.id())); + return !node.isDaemon() && (CU.affinityNode(node, cacheFilter) || clientNodes.containsKey(node.id())); } /** @@ -2535,8 +2619,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (node.isDaemon()) return false; - if (nearEnabled && cacheFilter.apply(node)) - return true; + if (CU.affinityNode(node, cacheFilter)) + return nearEnabled; Boolean near = clientNodes.get(node.id()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java index 9a81cd1..f1561bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java @@ -21,7 +21,6 @@ import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.*; import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.indexing.*; @@ -46,9 +45,6 @@ public class GridIndexingManager extends GridManagerAdapter<IndexingSpi> { * @throws IgniteCheckedException Thrown in case of any errors. */ @Override public void start() throws IgniteCheckedException { - if (!enabled()) - U.warn(log, "Indexing is disabled (to enable please configure GridIndexingSpi)."); - startSpi(); if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java index e9df8b8..5373e46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java @@ -68,6 +68,18 @@ class GridAffinityAssignment implements Serializable { } /** + * @param topVer Topology version. + * @param aff Assignment to copy from. + */ + GridAffinityAssignment(AffinityTopologyVersion topVer, GridAffinityAssignment aff) { + this.topVer = topVer; + + assignment = aff.assignment; + primary = aff.primary; + backup = aff.backup; + } + + /** * @return Affinity assignment. */ public List<List<ClusterNode>> assignment() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index eccd9f9..c46490e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -32,6 +32,8 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*; + /** * Affinity cached function. */ @@ -221,6 +223,35 @@ public class GridAffinityAssignmentCache { } /** + * Copies previous affinity assignment when discovery event does not cause affinity assignment changes + * (e.g. client node joins on leaves). + * + * @param evt Event. + * @param topVer Topology version. + */ + public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersion topVer) { + GridAffinityAssignment aff = head.get(); + + assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.primaryPartitions(evt.eventNode().id()).isEmpty() : evt; + assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.backupPartitions(evt.eventNode().id()).isEmpty() : evt; + + GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, aff); + + affCache.put(topVer, assignmentCpy); + head.set(assignmentCpy); + + for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) { + if (entry.getKey().compareTo(topVer) <= 0) { + if (log.isDebugEnabled()) + log.debug("Completing topology ready future (use previous affinity) " + + "[locNodeId=" + ctx.localNodeId() + ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']'); + + entry.getValue().onDone(topVer); + } + } + } + + /** * @return Last calculated affinity version. */ public AffinityTopologyVersion lastVersion() { @@ -422,6 +453,7 @@ public class GridAffinityAssignmentCache { /** * + * @param reqTopVer Required topology version. */ private AffinityReadyFuture(AffinityTopologyVersion reqTopVer) { this.reqTopVer = reqTopVer; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index daa2bc2..aac63c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -164,14 +164,17 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * * @param cacheName Cache name. * @param key Key to map. + * @param topVer Topology version. * @return Affinity nodes, primary first. * @throws IgniteCheckedException If failed. */ - public <K> List<ClusterNode> mapKeyToPrimaryAndBackups(@Nullable String cacheName, K key) throws IgniteCheckedException { + public <K> List<ClusterNode> mapKeyToPrimaryAndBackups(@Nullable String cacheName, + K key, + AffinityTopologyVersion topVer) + throws IgniteCheckedException + { A.notNull(key, "key"); - AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx(); - AffinityInfo affInfo = affinityCache(cacheName, topVer); if (affInfo == null) @@ -181,6 +184,20 @@ public class GridAffinityProcessor extends GridProcessorAdapter { } /** + * Map single key to primary and backup nodes. + * + * @param cacheName Cache name. + * @param key Key to map. + * @return Affinity nodes, primary first. + * @throws IgniteCheckedException If failed. + */ + public <K> List<ClusterNode> mapKeyToPrimaryAndBackups(@Nullable String cacheName, K key) + throws IgniteCheckedException + { + return mapKeyToPrimaryAndBackups(cacheName, key, ctx.discovery().topologyVersionEx()); + } + + /** * Gets affinity key for cache key. * * @param cacheName Cache name. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictableEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictableEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictableEntryImpl.java index 5d6062e..7a3fbee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictableEntryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictableEntryImpl.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; import java.util.*; @@ -91,6 +92,36 @@ public class CacheEvictableEntryImpl<K, V> implements EvictableEntry<K, V> { } /** {@inheritDoc} */ + public int size() { + try { + GridCacheContext<Object, Object> cctx = cached.context(); + + KeyCacheObject key = cached.key(); + + byte[] keyBytes = key.valueBytes(cctx.cacheObjectContext()); + + byte[] valBytes = null; + + if (cctx.useOffheapEntry()) + valBytes = cctx.offheap().get(cctx.swap().spaceName(), cached.partition(), key, keyBytes); + else { + CacheObject cacheObj = cached.valueBytes(); + + if (cacheObj != null) + valBytes = cacheObj.valueBytes(cctx.cacheObjectContext()); + } + + return valBytes == null ? keyBytes.length : keyBytes.length + valBytes.length; + } + catch (GridCacheEntryRemovedException e) { + return 0; + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public V getValue() { try {
