http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java deleted file mode 100644 index 636cecf..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignmentCache.java +++ /dev/null @@ -1,409 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.affinity; - -import org.apache.ignite.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.portables.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Affinity cached function. - */ -public class GridAffinityAssignmentCache { - /** Node order comparator. */ - private static final Comparator<ClusterNode> nodeCmp = new GridNodeOrderComparator(); - - /** Cache name. */ - private final String cacheName; - - /** Number of backups. */ - private int backups; - - /** Affinity function. */ - private final GridCacheAffinityFunction aff; - - /** Partitions count. */ - private final int partsCnt; - - /** Affinity mapper function. */ - private final GridCacheAffinityKeyMapper affMapper; - - /** Affinity calculation results cache: topology version => partition => nodes. */ - private final ConcurrentMap<Long, GridAffinityAssignment> affCache; - - /** Cache item corresponding to the head topology version. */ - private final AtomicReference<GridAffinityAssignment> head; - - /** Discovery manager. */ - private final GridCacheContext ctx; - - /** Ready futures. */ - private final ConcurrentMap<Long, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<>(); - - /** Log. */ - private IgniteLogger log; - - /** - * Constructs affinity cached calculations. - * - * @param ctx Kernal context. - * @param cacheName Cache name. - * @param aff Affinity function. - * @param affMapper Affinity key mapper. - */ - @SuppressWarnings("unchecked") - public GridAffinityAssignmentCache(GridCacheContext ctx, String cacheName, GridCacheAffinityFunction aff, - GridCacheAffinityKeyMapper affMapper, int backups) { - this.ctx = ctx; - this.aff = aff; - this.affMapper = affMapper; - this.cacheName = cacheName; - this.backups = backups; - - log = ctx.logger(GridAffinityAssignmentCache.class); - - partsCnt = aff.partitions(); - affCache = new ConcurrentLinkedHashMap<>(); - head = new AtomicReference<>(new GridAffinityAssignment(-1)); - } - - /** - * Initializes affinity with given topology version and assignment. The assignment is calculated on remote nodes - * and brought to local node on partition map exchange. - * - * @param topVer Topology version. - * @param affAssignment Affinity assignment for topology version. - */ - public void initialize(long topVer, List<List<ClusterNode>> affAssignment) { - GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment); - - affCache.put(topVer, assignment); - head.set(assignment); - - for (Map.Entry<Long, AffinityReadyFuture> entry : readyFuts.entrySet()) { - if (entry.getKey() >= topVer) - entry.getValue().onDone(topVer); - } - } - - /** - * Calculates affinity cache for given topology version. - * - * @param topVer Topology version to calculate affinity cache for. - * @param discoEvt Discovery event that caused this topology version change. - */ - public List<List<ClusterNode>> calculate(long topVer, IgniteDiscoveryEvent discoEvt) { - if (log.isDebugEnabled()) - log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() + - ", discoEvt=" + discoEvt + ']'); - - GridAffinityAssignment prev = affCache.get(topVer - 1); - - List<ClusterNode> sorted; - - if (ctx.isLocal()) - // For local cache always use local node. - sorted = Collections.singletonList(ctx.localNode()); - else { - // Resolve nodes snapshot for specified topology version. - Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer); - - sorted = sort(nodes); - } - - List<List<ClusterNode>> prevAssignment = prev == null ? null : prev.assignment(); - - List<List<ClusterNode>> assignment = aff.assignPartitions( - new GridCacheAffinityFunctionContextImpl(sorted, prevAssignment, discoEvt, topVer, backups)); - - GridAffinityAssignment updated = new GridAffinityAssignment(topVer, assignment); - - updated = F.addIfAbsent(affCache, topVer, updated); - - // Update top version, if required. - while (true) { - GridAffinityAssignment headItem = head.get(); - - if (headItem.topologyVersion() >= topVer) - break; - - if (head.compareAndSet(headItem, updated)) - break; - } - - for (Map.Entry<Long, AffinityReadyFuture> entry : readyFuts.entrySet()) { - if (entry.getKey() <= topVer) { - if (log.isDebugEnabled()) - log.debug("Completing topology ready future (calculated affinity) [locNodeId=" + ctx.localNodeId() + - ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']'); - - entry.getValue().onDone(topVer); - } - } - - return updated.assignment(); - } - - /** - * @return Last calculated affinity version. - */ - public long lastVersion() { - return head.get().topologyVersion(); - } - - /** - * Clean up outdated cache items. - * - * @param topVer Actual topology version, older versions will be removed. - */ - public void cleanUpCache(long topVer) { - if (log.isDebugEnabled()) - log.debug("Cleaning up cache for version [locNodeId=" + ctx.localNodeId() + - ", topVer=" + topVer + ']'); - - for (Iterator<Long> it = affCache.keySet().iterator(); it.hasNext(); ) - if (it.next() < topVer) - it.remove(); - } - - /** - * @param topVer Topology version. - * @return Affinity assignment. - */ - public List<List<ClusterNode>> assignments(long topVer) { - GridAffinityAssignment aff = cachedAffinity(topVer); - - return aff.assignment(); - } - - /** - * Gets future that will be completed after topology with version {@code topVer} is calculated. - * - * @param topVer Topology version to await for. - * @return Future that will be completed after affinity for topology version {@code topVer} is calculated. - */ - public IgniteFuture<Long> readyFuture(long topVer) { - GridAffinityAssignment aff = head.get(); - - if (aff.topologyVersion() >= topVer) { - if (log.isDebugEnabled()) - log.debug("Returning finished future for readyFuture [head=" + aff.topologyVersion() + - ", topVer=" + topVer + ']'); - - return null; - } - - GridFutureAdapter<Long> fut = F.addIfAbsent(readyFuts, topVer, - new AffinityReadyFuture(ctx.kernalContext())); - - aff = head.get(); - - if (aff.topologyVersion() >= topVer) { - if (log.isDebugEnabled()) - log.debug("Completing topology ready future right away [head=" + aff.topologyVersion() + - ", topVer=" + topVer + ']'); - - fut.onDone(topVer); - } - - return fut; - } - - /** - * @return Partition count. - */ - public int partitions() { - return partsCnt; - } - - /** - * NOTE: Use this method always when you need to calculate partition id for - * a key provided by user. It's required since we should apply affinity mapper - * logic in order to find a key that will eventually be passed to affinity function. - * - * @param key Key. - * @return Partition. - */ - public int partition(Object key) { - if (ctx.portableEnabled()) { - try { - key = ctx.marshalToPortable(key); - } - catch (PortableException e) { - U.error(log, "Failed to marshal key to portable: " + key, e); - } - } - - return aff.partition(affMapper.affinityKey(key)); - } - - /** - * Gets affinity nodes for specified partition. - * - * @param part Partition. - * @param topVer Topology version. - * @return Affinity nodes. - */ - public List<ClusterNode> nodes(int part, long topVer) { - // Resolve cached affinity nodes. - return cachedAffinity(topVer).get(part); - } - - /** - * Get primary partitions for specified node ID. - * - * @param nodeId Node ID to get primary partitions for. - * @param topVer Topology version. - * @return Primary partitions for specified node ID. - */ - public Set<Integer> primaryPartitions(UUID nodeId, long topVer) { - return cachedAffinity(topVer).primaryPartitions(nodeId); - } - - /** - * Get backup partitions for specified node ID. - * - * @param nodeId Node ID to get backup partitions for. - * @param topVer Topology version. - * @return Backup partitions for specified node ID. - */ - public Set<Integer> backupPartitions(UUID nodeId, long topVer) { - return cachedAffinity(topVer).backupPartitions(nodeId); - } - - /** - * Get cached affinity for specified topology version. - * - * @param topVer Topology version. - * @return Cached affinity. - */ - private GridAffinityAssignment cachedAffinity(long topVer) { - if (topVer == -1) - topVer = lastVersion(); - else - awaitTopologyVersion(topVer); - - assert topVer >= 0 : topVer; - - GridAffinityAssignment cache = head.get(); - - if (cache.topologyVersion() != topVer) { - cache = affCache.get(topVer); - - if (cache == null) { - throw new IllegalStateException("Getting affinity for topology version earlier than affinity is " + - "calculated [locNodeId=" + ctx.localNodeId() + ", topVer=" + topVer + - ", head=" + head.get().topologyVersion() + ']'); - } - } - - assert cache.topologyVersion() == topVer : "Invalid cached affinity: " + cache; - - return cache; - } - - /** - * @param topVer Topology version to wait. - */ - private void awaitTopologyVersion(long topVer) { - GridAffinityAssignment aff = head.get(); - - if (aff.topologyVersion() >= topVer) - return; - - try { - if (log.isDebugEnabled()) - log.debug("Will wait for topology version [locNodeId=" + ctx.localNodeId() + - ", topVer=" + topVer + ']'); - - IgniteFuture<Long> fut = readyFuture(topVer); - - if (fut != null) - fut.get(); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to wait for affinity ready future for topology version: " + topVer, - e); - } - } - - /** - * Sorts nodes according to order. - * - * @param nodes Nodes to sort. - * @return Sorted list of nodes. - */ - private List<ClusterNode> sort(Collection<ClusterNode> nodes) { - List<ClusterNode> sorted = new ArrayList<>(nodes.size()); - - sorted.addAll(nodes); - - Collections.sort(sorted, nodeCmp); - - return sorted; - } - - /** - * Affinity ready future. Will remove itself from ready futures map. - */ - private class AffinityReadyFuture extends GridFutureAdapter<Long> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public AffinityReadyFuture() { - // No-op. - } - - /** - * @param ctx Kernal context. - */ - private AffinityReadyFuture(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public boolean onDone(Long res, @Nullable Throwable err) { - assert res != null; - - boolean done = super.onDone(res, err); - - if (done) - readyFuts.remove(res, this); - - return done; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityMessage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityMessage.java deleted file mode 100644 index 078fe40..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityMessage.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.affinity; - -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.tostring.*; - -import java.io.*; -import java.util.*; - -/** - * Object wrapper containing serialized byte array of original object and deployment information. - */ -class GridAffinityMessage implements Externalizable, IgniteOptimizedMarshallable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"}) - private static Object GG_CLASS_ID; - - /** */ - private byte[] src; - - /** */ - private IgniteUuid clsLdrId; - - /** */ - private IgniteDeploymentMode depMode; - - /** */ - private String srcClsName; - - /** */ - private String userVer; - - /** Node class loader participants. */ - @GridToStringInclude - private Map<UUID, IgniteUuid> ldrParties; - - /** - * @param src Source object. - * @param srcClsName Source object class name. - * @param clsLdrId Class loader ID. - * @param depMode Deployment mode. - * @param userVer User version. - * @param ldrParties Node loader participant map. - */ - GridAffinityMessage( - byte[] src, - String srcClsName, - IgniteUuid clsLdrId, - IgniteDeploymentMode depMode, - String userVer, - Map<UUID, IgniteUuid> ldrParties) { - this.src = src; - this.srcClsName = srcClsName; - this.depMode = depMode; - this.clsLdrId = clsLdrId; - this.userVer = userVer; - this.ldrParties = ldrParties; - } - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridAffinityMessage() { - // No-op. - } - - /** - * @return Source object. - */ - public byte[] source() { - return src; - } - - /** - * @return the Class loader ID. - */ - public IgniteUuid classLoaderId() { - return clsLdrId; - } - - /** - * @return Deployment mode. - */ - public IgniteDeploymentMode deploymentMode() { - return depMode; - } - - /** - * @return Source message class name. - */ - public String sourceClassName() { - return srcClsName; - } - - /** - * @return User version. - */ - public String userVersion() { - return userVer; - } - - /** - * @return Node class loader participant map. - */ - public Map<UUID, IgniteUuid> loaderParticipants() { - return ldrParties != null ? Collections.unmodifiableMap(ldrParties) : null; - } - - /** {@inheritDoc} */ - @Override public Object ggClassId() { - return GG_CLASS_ID; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeByteArray(out, src); - - out.writeInt(depMode.ordinal()); - - U.writeGridUuid(out, clsLdrId); - U.writeString(out, srcClsName); - U.writeString(out, userVer); - U.writeMap(out, ldrParties); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - src = U.readByteArray(in); - - depMode = IgniteDeploymentMode.fromOrdinal(in.readInt()); - - clsLdrId = U.readGridUuid(in); - srcClsName = U.readString(in); - userVer = U.readString(in); - ldrParties = U.readMap(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridAffinityMessage.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java deleted file mode 100644 index 108cf61..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java +++ /dev/null @@ -1,528 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.affinity; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.managers.eventstorage.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.processors.timeout.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.events.IgniteEventType.*; -import static org.apache.ignite.cache.GridCacheMode.*; -import static org.apache.ignite.internal.GridClosureCallMode.*; -import static org.gridgain.grid.kernal.processors.affinity.GridAffinityUtils.*; - -/** - * Data affinity processor. - */ -public class GridAffinityProcessor extends GridProcessorAdapter { - /** Affinity map cleanup delay (ms). */ - private static final long AFFINITY_MAP_CLEAN_UP_DELAY = 3000; - - /** Retries to get affinity in case of error. */ - private static final int ERROR_RETRIES = 3; - - /** Time to wait between errors (in milliseconds). */ - private static final long ERROR_WAIT = 500; - - /** Null cache name. */ - private static final String NULL_NAME = U.id8(UUID.randomUUID()); - - /** Affinity map. */ - private final ConcurrentMap<AffinityAssignmentKey, IgniteFuture<AffinityInfo>> affMap = new ConcurrentHashMap8<>(); - - /** Listener. */ - private final GridLocalEventListener lsnr = new GridLocalEventListener() { - @Override public void onEvent(IgniteEvent evt) { - int evtType = evt.type(); - - assert evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT || evtType == EVT_NODE_JOINED; - - if (affMap.isEmpty()) - return; // Skip empty affinity map. - - final IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt; - - // Clean up affinity functions if such cache no more exists. - if (evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) { - final Collection<String> caches = new HashSet<>(); - - for (ClusterNode clusterNode : ctx.discovery().allNodes()) - caches.addAll(U.cacheNames(clusterNode)); - - final Collection<AffinityAssignmentKey> rmv = new GridLeanSet<>(); - - for (AffinityAssignmentKey key : affMap.keySet()) { - if (!caches.contains(key.cacheName) || key.topVer < discoEvt.topologyVersion() - 1) - rmv.add(key); - } - - ctx.timeout().addTimeoutObject(new GridTimeoutObjectAdapter( - IgniteUuid.fromUuid(ctx.localNodeId()), AFFINITY_MAP_CLEAN_UP_DELAY) { - @Override public void onTimeout() { - affMap.keySet().removeAll(rmv); - } - }); - } - } - }; - - /** - * @param ctx Context. - */ - public GridAffinityProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - ctx.event().addLocalEventListener(lsnr, EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - if (ctx != null && ctx.event() != null) - ctx.event().removeLocalEventListener(lsnr); - } - - /** - * Maps keys to nodes for given cache. - * - * @param cacheName Cache name. - * @param keys Keys to map. - * @return Map of nodes to keys. - * @throws IgniteCheckedException If failed. - */ - public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable String cacheName, - @Nullable Collection<? extends K> keys) throws IgniteCheckedException { - return keysToNodes(cacheName, keys); - } - - /** - * Maps keys to nodes on default cache. - * - * @param keys Keys to map. - * @return Map of nodes to keys. - * @throws IgniteCheckedException If failed. - */ - public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<? extends K> keys) - throws IgniteCheckedException { - return keysToNodes(null, keys); - } - - /** - * Maps single key to a node. - * - * @param cacheName Cache name. - * @param key Key to map. - * @return Picked node. - * @throws IgniteCheckedException If failed. - */ - @Nullable public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key) throws IgniteCheckedException { - Map<ClusterNode, Collection<K>> map = keysToNodes(cacheName, F.asList(key)); - - return map != null ? F.first(map.keySet()) : null; - } - - /** - * Maps single key to a node. - * - * @param cacheName Cache name. - * @param key Key to map. - * @return Picked node. - * @throws IgniteCheckedException If failed. - */ - @Nullable public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key, long topVer) throws IgniteCheckedException { - Map<ClusterNode, Collection<K>> map = keysToNodes(cacheName, F.asList(key), topVer); - - return map != null ? F.first(map.keySet()) : null; - } - - /** - * Maps single key to a node on default cache. - * - * @param key Key to map. - * @return Picked node. - * @throws IgniteCheckedException If failed. - */ - @Nullable public <K> ClusterNode mapKeyToNode(K key) throws IgniteCheckedException { - return mapKeyToNode(null, key); - } - - /** - * Gets affinity key for cache key. - * - * @param cacheName Cache name. - * @param key Cache key. - * @return Affinity key. - * @throws IgniteCheckedException In case of error. - */ - @SuppressWarnings("unchecked") - @Nullable public Object affinityKey(@Nullable String cacheName, @Nullable Object key) throws IgniteCheckedException { - if (key == null) - return null; - - AffinityInfo affInfo = affinityCache(cacheName, ctx.discovery().topologyVersion()); - - if (affInfo == null || affInfo.mapper == null) - return null; - - if (affInfo.portableEnabled) - key = ctx.portable().marshalToPortable(key); - - return affInfo.mapper.affinityKey(key); - } - - /** - * @param cacheName Cache name. - * @return Non-null cache name. - */ - private String maskNull(@Nullable String cacheName) { - return cacheName == null ? NULL_NAME : cacheName; - } - - /** - * @param cacheName Cache name. - * @param keys Keys. - * @return Affinity map. - * @throws IgniteCheckedException If failed. - */ - private <K> Map<ClusterNode, Collection<K>> keysToNodes(@Nullable final String cacheName, - Collection<? extends K> keys) throws IgniteCheckedException { - return keysToNodes(cacheName, keys, ctx.discovery().topologyVersion()); - } - - /** - * @param cacheName Cache name. - * @param keys Keys. - * @param topVer Topology version. - * @return Affinity map. - * @throws IgniteCheckedException If failed. - */ - private <K> Map<ClusterNode, Collection<K>> keysToNodes(@Nullable final String cacheName, - Collection<? extends K> keys, long topVer) throws IgniteCheckedException { - if (F.isEmpty(keys)) - return Collections.emptyMap(); - - ClusterNode loc = ctx.discovery().localNode(); - - if (U.hasCache(loc, cacheName) && ctx.cache().cache(cacheName).configuration().getCacheMode() == LOCAL) - return F.asMap(loc, (Collection<K>)keys); - - AffinityInfo affInfo = affinityCache(cacheName, topVer); - - return affInfo != null ? affinityMap(affInfo, keys) : Collections.<ClusterNode, Collection<K>>emptyMap(); - } - - /** - * @param cacheName Cache name. - * @return Affinity cache. - * @throws IgniteCheckedException In case of error. - */ - @SuppressWarnings("ErrorNotRethrown") - private AffinityInfo affinityCache(@Nullable final String cacheName, long topVer) throws IgniteCheckedException { - AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, topVer); - - IgniteFuture<AffinityInfo> fut = affMap.get(key); - - if (fut != null) - return fut.get(); - - ClusterNode loc = ctx.discovery().localNode(); - - // Check local node. - if (U.hasCache(loc, cacheName)) { - GridCacheContext<Object,Object> cctx = ctx.cache().internalCache(cacheName).context(); - - AffinityInfo info = new AffinityInfo( - cctx.config().getAffinity(), - cctx.config().getAffinityMapper(), - new GridAffinityAssignment(topVer, cctx.affinity().assignments(topVer)), - cctx.portableEnabled()); - - IgniteFuture<AffinityInfo> old = affMap.putIfAbsent(key, new GridFinishedFuture<>(ctx, info)); - - if (old != null) - info = old.get(); - - return info; - } - - Collection<ClusterNode> cacheNodes = F.view( - ctx.discovery().remoteNodes(), - new P1<ClusterNode>() { - @Override public boolean apply(ClusterNode n) { - return U.hasCache(n, cacheName); - } - }); - - if (F.isEmpty(cacheNodes)) - return null; - - GridFutureAdapter<AffinityInfo> fut0 = new GridFutureAdapter<>(); - - IgniteFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0); - - if (old != null) - return old.get(); - - int max = ERROR_RETRIES; - int cnt = 0; - - Iterator<ClusterNode> it = cacheNodes.iterator(); - - // We are here because affinity has not been fetched yet, or cache mode is LOCAL. - while (true) { - cnt++; - - if (!it.hasNext()) - it = cacheNodes.iterator(); - - // Double check since we deal with dynamic view. - if (!it.hasNext()) - // Exception will be caught in this method. - throw new IgniteCheckedException("No cache nodes in topology for cache name: " + cacheName); - - ClusterNode n = it.next(); - - GridCacheMode mode = U.cacheMode(n, cacheName); - - assert mode != null; - - // Map all keys to a single node, if the cache mode is LOCAL. - if (mode == LOCAL) { - fut0.onDone(new IgniteCheckedException("Failed to map keys for LOCAL cache.")); - - // Will throw exception. - fut0.get(); - } - - try { - // Resolve cache context for remote node. - // Set affinity function before counting down on latch. - fut0.onDone(affinityInfoFromNode(cacheName, topVer, n)); - - break; - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to get affinity from node (will retry) [cache=" + cacheName + - ", node=" + U.toShortString(n) + ", msg=" + e.getMessage() + ']'); - - if (cnt < max) { - U.sleep(ERROR_WAIT); - - continue; - } - - affMap.remove(maskNull(cacheName), fut0); - - fut0.onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + n, e)); - - break; - } - catch (RuntimeException | Error e) { - fut0.onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + n, e)); - - break; - } - } - - return fut0.get(); - } - - /** - * Requests {@link GridCacheAffinityFunction} and {@link org.apache.ignite.cache.affinity.GridCacheAffinityKeyMapper} from remote node. - * - * @param cacheName Name of cache on which affinity is requested. - * @param n Node from which affinity is requested. - * @return Affinity cached function. - * @throws IgniteCheckedException If either local or remote node cannot get deployment for affinity objects. - */ - private AffinityInfo affinityInfoFromNode(@Nullable String cacheName, long topVer, ClusterNode n) - throws IgniteCheckedException { - GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> t = ctx.closure() - .callAsyncNoFailover(BALANCE, affinityJob(cacheName, topVer), F.asList(n), true/*system pool*/).get(); - - GridCacheAffinityFunction f = (GridCacheAffinityFunction)unmarshall(ctx, n.id(), t.get1()); - GridCacheAffinityKeyMapper m = (GridCacheAffinityKeyMapper)unmarshall(ctx, n.id(), t.get2()); - - assert m != null; - - // Bring to initial state. - f.reset(); - m.reset(); - - Boolean portableEnabled = U.portableEnabled(n, cacheName); - - return new AffinityInfo(f, m, t.get3(), portableEnabled != null && portableEnabled); - } - - /** - * @param aff Affinity function. - * @param keys Keys. - * @return Affinity map. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings({"unchecked"}) - private <K> Map<ClusterNode, Collection<K>> affinityMap(AffinityInfo aff, Collection<? extends K> keys) - throws IgniteCheckedException { - assert aff != null; - assert !F.isEmpty(keys); - - try { - if (keys.size() == 1) - return Collections.singletonMap(primary(aff, F.first(keys)), (Collection<K>)keys); - - Map<ClusterNode, Collection<K>> map = new GridLeanMap<>(); - - for (K k : keys) { - ClusterNode n = primary(aff, k); - - Collection<K> mapped = map.get(n); - - if (mapped == null) - map.put(n, mapped = new LinkedList<>()); - - mapped.add(k); - } - - return map; - } - catch (IgniteException e) { - // Affinity calculation may lead to IgniteException if no cache nodes found for pair cacheName+topVer. - throw new IgniteCheckedException("Failed to get affinity map for keys: " + keys, e); - } - } - - /** - * Get primary node for cached key. - * - * @param aff Affinity function. - * @param key Key to check. - * @return Primary node for given key. - * @throws IgniteCheckedException In case of error. - */ - private <K> ClusterNode primary(AffinityInfo aff, K key) throws IgniteCheckedException { - int part = aff.affFunc.partition(aff.mapper.affinityKey(key)); - - Collection<ClusterNode> nodes = aff.assignment.get(part); - - if (F.isEmpty(nodes)) - throw new IgniteCheckedException("Failed to get affinity nodes [aff=" + aff + ", key=" + key + ']'); - - return nodes.iterator().next(); - } - - /** {@inheritDoc} */ - @Override public void printMemoryStats() { - X.println(">>>"); - X.println(">>> Affinity processor memory stats [grid=" + ctx.gridName() + ']'); - X.println(">>> affMapSize: " + affMap.size()); - } - - /** - * - */ - private static class AffinityInfo { - /** Affinity function. */ - private GridCacheAffinityFunction affFunc; - - /** Mapper */ - private GridCacheAffinityKeyMapper mapper; - - /** Assignment. */ - private GridAffinityAssignment assignment; - - /** Portable enabled flag. */ - private boolean portableEnabled; - - /** - * @param affFunc Affinity function. - * @param mapper Affinity key mapper. - * @param assignment Partition assignment. - * @param portableEnabled Portable enabled flag. - */ - private AffinityInfo(GridCacheAffinityFunction affFunc, GridCacheAffinityKeyMapper mapper, - GridAffinityAssignment assignment, boolean portableEnabled) { - this.affFunc = affFunc; - this.mapper = mapper; - this.assignment = assignment; - this.portableEnabled = portableEnabled; - } - } - - /** - * - */ - private static class AffinityAssignmentKey { - /** */ - private String cacheName; - - /** */ - private long topVer; - - /** - * @param cacheName Cache name. - * @param topVer Topology version. - */ - private AffinityAssignmentKey(String cacheName, long topVer) { - this.cacheName = cacheName; - this.topVer = topVer; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (!(o instanceof AffinityAssignmentKey)) - return false; - - AffinityAssignmentKey that = (AffinityAssignmentKey)o; - - return topVer == that.topVer && F.eq(cacheName, that.cacheName); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = cacheName != null ? cacheName.hashCode() : 0; - - res = 31 * res + (int)(topVer ^ (topVer >>> 32)); - - return res; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityUtils.java deleted file mode 100644 index fe8a4b8..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityUtils.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.affinity; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.internal.managers.deployment.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.processors.task.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Affinity utility methods. - */ -class GridAffinityUtils { - /** - * Creates a job that will look up {@link org.apache.ignite.cache.affinity.GridCacheAffinityKeyMapper} and {@link org.apache.ignite.cache.affinity.GridCacheAffinityFunction} on a - * cache with given name. If they exist, this job will serialize and transfer them together with all deployment - * information needed to unmarshal objects on remote node. Result is returned as a {@link GridTuple3}, - * where first object is {@link GridAffinityMessage} for {@link org.apache.ignite.cache.affinity.GridCacheAffinityFunction}, second object - * is {@link GridAffinityMessage} for {@link org.apache.ignite.cache.affinity.GridCacheAffinityKeyMapper} and third object is affinity assignment - * for given topology version. - * - * @param cacheName Cache name. - * @return Affinity job. - */ - static Callable<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>> affinityJob( - String cacheName, long topVer) { - return new AffinityJob(cacheName, topVer); - } - - /** - * @param ctx {@code GridKernalContext} instance which provides deployment manager - * @param o Object for which deployment should be obtained. - * @return Deployment object for given instance, - * @throws IgniteCheckedException If node cannot create deployment for given object. - */ - private static GridAffinityMessage affinityMessage(GridKernalContext ctx, Object o) throws IgniteCheckedException { - Class cls = o.getClass(); - - GridDeployment dep = ctx.deploy().deploy(cls, cls.getClassLoader()); - - if (dep == null) - throw new IgniteDeploymentException("Failed to deploy affinity object with class: " + cls.getName()); - - return new GridAffinityMessage( - ctx.config().getMarshaller().marshal(o), - cls.getName(), - dep.classLoaderId(), - dep.deployMode(), - dep.userVersion(), - dep.participants()); - } - - /** - * Unmarshalls transfer object from remote node within a given context. - * - * @param ctx Grid kernal context that provides deployment and marshalling services. - * @param sndNodeId {@link UUID} of the sender node. - * @param msg Transfer object that contains original serialized object and deployment information. - * @return Unmarshalled object. - * @throws IgniteCheckedException If node cannot obtain deployment. - */ - static Object unmarshall(GridKernalContext ctx, UUID sndNodeId, GridAffinityMessage msg) - throws IgniteCheckedException { - GridDeployment dep = ctx.deploy().getGlobalDeployment( - msg.deploymentMode(), - msg.sourceClassName(), - msg.sourceClassName(), - msg.userVersion(), - sndNodeId, - msg.classLoaderId(), - msg.loaderParticipants(), - null); - - if (dep == null) - throw new IgniteDeploymentException("Failed to obtain affinity object (is peer class loading turned on?): " + - msg); - - Object src = ctx.config().getMarshaller().unmarshal(msg.source(), dep.classLoader()); - - // Resource injection. - ctx.resource().inject(dep, dep.deployedClass(msg.sourceClassName()), src); - - return src; - } - - /** Ensure singleton. */ - private GridAffinityUtils() { - // No-op. - } - - /** - * - */ - @GridInternal - private static class AffinityJob implements - Callable<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>>, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @IgniteInstanceResource - private Ignite ignite; - - /** */ - @IgniteLoggerResource - private IgniteLogger log; - - /** */ - private String cacheName; - - /** */ - private long topVer; - - /** - * @param cacheName Cache name. - */ - private AffinityJob(@Nullable String cacheName, long topVer) { - this.cacheName = cacheName; - this.topVer = topVer; - } - - /** - * - */ - public AffinityJob() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> call() - throws Exception { - assert ignite != null; - assert log != null; - - GridKernal kernal = ((GridKernal) ignite); - - GridCacheContext<Object, Object> cctx = kernal.internalCache(cacheName).context(); - - assert cctx != null; - - GridKernalContext ctx = kernal.context(); - - return F.t( - affinityMessage(ctx, cctx.config().getAffinity()), - affinityMessage(ctx, cctx.config().getAffinityMapper()), - new GridAffinityAssignment(topVer, cctx.affinity().assignments(topVer))); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - out.writeLong(topVer); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - topVer = in.readLong(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridCacheAffinityFunctionContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridCacheAffinityFunctionContextImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridCacheAffinityFunctionContextImpl.java deleted file mode 100644 index a336825..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridCacheAffinityFunctionContextImpl.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.affinity; - -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.events.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Cache affinity function context implementation. Simple bean that holds all required fields. - */ -public class GridCacheAffinityFunctionContextImpl implements GridCacheAffinityFunctionContext { - /** Topology snapshot. */ - private List<ClusterNode> topSnapshot; - - /** Previous affinity assignment. */ - private List<List<ClusterNode>> prevAssignment; - - /** Discovery event that caused this topology change. */ - private IgniteDiscoveryEvent discoEvt; - - /** Topology version. */ - private long topVer; - - /** Number of backups to assign. */ - private int backups; - - /** - * @param topSnapshot Topology snapshot. - * @param topVer Topology version. - */ - public GridCacheAffinityFunctionContextImpl(List<ClusterNode> topSnapshot, List<List<ClusterNode>> prevAssignment, - IgniteDiscoveryEvent discoEvt, long topVer, int backups) { - this.topSnapshot = topSnapshot; - this.prevAssignment = prevAssignment; - this.discoEvt = discoEvt; - this.topVer = topVer; - this.backups = backups; - } - - /** {@inheritDoc} */ - @Nullable @Override public List<ClusterNode> previousAssignment(int part) { - return prevAssignment.get(part); - } - - /** {@inheritDoc} */ - @Override public List<ClusterNode> currentTopologySnapshot() { - return topSnapshot; - } - - /** {@inheritDoc} */ - @Override public long currentTopologyVersion() { - return topVer; - } - - /** {@inheritDoc} */ - @Nullable @Override public IgniteDiscoveryEvent discoveryEvent() { - return discoEvt; - } - - /** {@inheritDoc} */ - @Override public int backups() { - return backups; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/package.html b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/package.html deleted file mode 100644 index 5bb1751..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/package.html +++ /dev/null @@ -1,23 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Data affinity processor. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAffinityManager.java index 02b1b78..7efea6c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAffinityManager.java @@ -22,7 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.affinity.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.jetbrains.annotations.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java index 2acb97d..1b1dade 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java @@ -42,7 +42,7 @@ import org.gridgain.grid.kernal.processors.cache.local.*; import org.gridgain.grid.kernal.processors.cache.query.*; import org.gridgain.grid.kernal.processors.cache.query.continuous.*; import org.gridgain.grid.kernal.processors.cache.transactions.*; -import org.gridgain.grid.kernal.processors.closure.*; +import org.apache.ignite.internal.processors.closure.*; import org.apache.ignite.internal.processors.offheap.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.plugin.security.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java index 6d05d9d..40102ff 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java @@ -24,7 +24,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.lang.*; import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.continuous.*; +import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.plugin.security.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java index e4d38f9..aca69c0 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java @@ -26,7 +26,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.internal.managers.deployment.*; import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.continuous.*; +import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosurePolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosurePolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosurePolicy.java deleted file mode 100644 index d5b7687..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosurePolicy.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.closure; - -import org.jetbrains.annotations.*; - -/** - * This enumeration defines different types of closure - * processing by the closure processor. - */ -public enum GridClosurePolicy { - /** Public execution pool. */ - PUBLIC_POOL, - - /** P2P execution pool. */ - P2P_POOL, - - /** System execution pool. */ - SYSTEM_POOL, - - /** GGFS pool. */ - GGFS_POOL; - - /** Enum values. */ - private static final GridClosurePolicy[] VALS = values(); - - /** - * Efficiently gets enumerated value from its ordinal. - * - * @param ord Ordinal value. - * @return Enumerated value. - */ - @Nullable public static GridClosurePolicy fromOrdinal(int ord) { - return ord >= 0 && ord < VALS.length ? VALS[ord] : null; - } -}
