http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java deleted file mode 100644 index 6d18b7d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheKeySet.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import javax.cache.Cache; -import org.apache.ignite.internal.util.GridSerializableSet; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.lang.IgnitePredicate; - -/** - * Key set based on provided entries with all remove operations backed - * by underlying cache. - */ -public class GridCacheKeySet<K, V> extends GridSerializableSet<K> { - /** */ - private static final long serialVersionUID = 0L; - - /** Cache context. */ - private final GridCacheContext<K, V> ctx; - - /** Filter. */ - private final IgnitePredicate<Cache.Entry<K, V>>[] filter; - - /** Base map. */ - private final Map<K, Cache.Entry<K, V>> map; - - /** - * @param ctx Cache context. - * @param c Entry collection. - * @param filter Filter. - */ - public GridCacheKeySet(GridCacheContext<K, V> ctx, Collection<? extends Cache.Entry<K, V>> c, - IgnitePredicate<Cache.Entry<K, V>>[] filter) { - map = new HashMap<>(); - - assert ctx != null; - - this.ctx = ctx; - this.filter = filter == null ? CU.<K, V>empty() : filter; - - for (Cache.Entry<K, V> e : c) { - if (e != null) - map.put(e.getKey(), e); - } - } - - /** {@inheritDoc} */ - @Override public Iterator<K> iterator() { - return new GridCacheIterator<>(ctx, map.values(), F.<K, V>cacheEntry2Key(), filter); - } - - /** {@inheritDoc} */ - @Override public void clear() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"SuspiciousMethodCalls"}) - @Override public boolean remove(Object o) { - Cache.Entry<K, V> e = map.get(o); - - if (e == null || !F.isAll(e, filter)) - return false; - - map.remove(o); - - ctx.grid().cache(ctx.name()).remove(e.getKey()); - - return true; - } - - /** {@inheritDoc} */ - @Override public int size() { - return F.size(map.values(), filter); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"SuspiciousMethodCalls"}) - @Override public boolean contains(Object o) { - Cache.Entry<K, V> e = map.get(o); - - return e != null && F.isAll(e, filter); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 75d96d8..c9ff138 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -3192,7 +3192,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (rmv) { onMarkedObsolete(); - cctx.cache().map().removeEntry(this); + cctx.cache().removeEntry(this); } } } @@ -4229,7 +4229,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (rmv) { onMarkedObsolete(); - cctx.cache().map().removeEntry(this); + cctx.cache().removeEntry(this); } } @@ -4287,18 +4287,32 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme flags |= IS_DELETED_MASK; - cctx.decrementPublicSize(this); + decrementMapPublicSize(); } else { assert deletedUnlocked() : this; flags &= ~IS_DELETED_MASK; - cctx.incrementPublicSize(this); + incrementMapPublicSize(); } } /** + * Increments public size of map. + */ + protected void incrementMapPublicSize() { + cctx.incrementPublicSize(this); + } + + /** + * Decrements public size of map. + */ + protected void decrementMapPublicSize() { + cctx.decrementPublicSize(this); + } + + /** * @return MVCC. */ @Nullable protected GridCacheMvcc mvccExtras() { http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 5214078..3d5052b 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -79,7 +79,6 @@ import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProce import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCache; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridNoStorageCacheMap; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 84c5d73..a58c209 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -891,7 +891,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ - @Override public Collection<V> values() { + @Override public Iterable<V> values() { CacheOperationContext prev = gate.enter(opCtx); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 98b1b59..e7010f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -51,7 +51,6 @@ import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.store.CacheStoreSessionListener; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.internal.GridKernalContext; http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java new file mode 100644 index 0000000..2532882 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Collections; +import java.util.Set; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtOffHeapCacheEntry; +import org.jetbrains.annotations.Nullable; + +/** + * Empty cache map that will never store any entries. + */ +public class GridNoStorageCacheMap implements GridCacheConcurrentMap { + /** Context. */ + private final GridCacheContext ctx; + + /** + * @param ctx Cache context. + */ + public GridNoStorageCacheMap(GridCacheContext ctx) { + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridCacheMapEntry getEntry(KeyCacheObject key) { + return null; + } + + /** {@inheritDoc} */ + @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, KeyCacheObject key, + @Nullable CacheObject val, boolean create, boolean touch) { + if (create) + return ctx.useOffheapEntry() ? + new GridDhtOffHeapCacheEntry(ctx, topVer, key, key.hashCode(), val) : + new GridDhtCacheEntry(ctx, topVer, key, key.hashCode(), val); + else + return null; + } + + /** {@inheritDoc} */ + @Override public boolean removeEntry(GridCacheEntryEx entry) { + throw new AssertionError(); + } + + /** {@inheritDoc} */ + @Override public int size() { + return 0; + } + + /** {@inheritDoc} */ + @Override public int publicSize() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void incrementPublicSize(GridCacheEntryEx e) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void decrementPublicSize(GridCacheEntryEx e) { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public GridCacheMapEntry randomEntry() { + return null; + } + + /** {@inheritDoc} */ + @Override public Set<KeyCacheObject> keySet(CacheEntryPredicate... filter) { + return Collections.emptySet(); + } + + /** {@inheritDoc} */ + @Override public Iterable<GridCacheMapEntry> entries(CacheEntryPredicate... filter) { + return Collections.emptySet(); + } + + /** {@inheritDoc} */ + @Override public Iterable<GridCacheMapEntry> allEntries(CacheEntryPredicate... filter) { + return Collections.emptySet(); + } + + /** {@inheritDoc} */ + @Override public Set<GridCacheMapEntry> entrySet(CacheEntryPredicate... filter) { + return Collections.emptySet(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index 5294f6a..3dc3471 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -911,7 +911,7 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { * * @return Collection of cached values. */ - public Collection<V> values(); + public Iterable<V> values(); /** * Gets set of all entries cached on this node. You can remove http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java index c214ce3..ffb846c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java @@ -30,4 +30,15 @@ public interface KeyCacheObject extends CacheObject { * @return {@code True} if internal cache key. */ public boolean internal(); + + /** + * @return Partition ID for this key or -1 if it is unknown. + */ + public int partition(); + + /** + * Sets partition ID for this key. + * @param part Partition ID. + */ + public void partition(int part); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java index e557c28..35e681c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectTransient; import org.jetbrains.annotations.Nullable; /** @@ -27,6 +28,10 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb /** */ private static final long serialVersionUID = 0L; + /** */ + @GridDirectTransient + private int part = -1; + /** * */ @@ -39,10 +44,30 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb * @param valBytes Value bytes. */ public KeyCacheObjectImpl(Object val, byte[] valBytes) { + this(val, valBytes, -1); + } + + /** + * @param val Value. + * @param valBytes Value bytes. + * @param part Partition. + */ + public KeyCacheObjectImpl(Object val, byte[] valBytes, int part) { assert val != null; this.val = val; this.valBytes = valBytes; + this.part = part; + } + + /** {@inheritDoc} */ + @Override public int partition() { + return part; + } + + /** {@inheritDoc} */ + @Override public void partition(int part) { + this.part = part; } /** {@inheritDoc} */ @@ -96,6 +121,11 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb } /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ @Override public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException { if (valBytes == null) valBytes = ctx.kernalContext().cacheObjects().marshal(ctx, val); @@ -119,4 +149,4 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb return val.equals(other.val); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 18e2d09..c043bc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -773,20 +773,36 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm /** {@inheritDoc} */ @Override public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj) { + return toCacheKeyObject(ctx, obj, userObj, -1); + } + + /** {@inheritDoc} */ + @Override public KeyCacheObject toCacheKeyObject( + CacheObjectContext ctx, + Object obj, + boolean userObj, + int partition + ) { if (!((CacheObjectBinaryContext)ctx).binaryEnabled()) - return super.toCacheKeyObject(ctx, obj, userObj); + return super.toCacheKeyObject(ctx, obj, userObj, partition); + + if (obj instanceof KeyCacheObject) { + ((KeyCacheObject)obj).partition(partition); - if (obj instanceof KeyCacheObject) return (KeyCacheObject)obj; + } if (((CacheObjectBinaryContext)ctx).binaryEnabled()) { obj = toBinary(obj); - if (obj instanceof KeyCacheObject) + if (obj instanceof KeyCacheObject) { + ((KeyCacheObject)obj).partition(partition); + return (KeyCacheObject)obj; + } } - return toCacheKeyObject0(obj, userObj); + return toCacheKeyObject0(obj, userObj, partition); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 6e97ec5..03f6474 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -405,7 +405,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter try { if (!locPart.isEmpty()) { - for (GridDhtCacheEntry o : locPart.entries()) { + for (GridCacheEntryEx o : locPart.allEntries()) { if (!o.obsoleteOrDeleted()) dataLdr.removeDataInternal(o.key()); } @@ -428,7 +428,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter if (near != null) { GridCacheVersion obsoleteVer = ctx.versions().next(); - for (GridCacheEntryEx e : near.map().allEntries0()) { + for (GridCacheEntryEx e : near.allEntries()) { if (!e.valid(topVer) && e.markObsolete(obsoleteVer)) near.removeEntry(e); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java index 5d07b6f..20cd52c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java @@ -78,6 +78,10 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { @GridDirectCollection(KeyCacheObject.class) private List<KeyCacheObject> keys; + /** Partition IDs of keys to lock. */ + @GridDirectCollection(int.class) + protected List<Integer> partIds; + /** Array indicating whether value should be returned for a key. */ @GridToStringInclude private boolean[] retVals; @@ -232,7 +236,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { * * @param skipStore Skip store flag. */ - private void skipStore(boolean skipStore){ + private void skipStore(boolean skipStore) { flags = skipStore ? (byte)(flags | SKIP_STORE_FLAG_MASK) : (byte)(flags & ~SKIP_STORE_FLAG_MASK); } @@ -284,11 +288,15 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { boolean retVal, GridCacheContext ctx ) throws IgniteCheckedException { - if (keys == null) + if (keys == null) { keys = new ArrayList<>(keysCount()); + partIds = new ArrayList<>(keysCount()); + } keys.add(key); + partIds.add(key.partition()); + retVals[idx] = retVal; idx++; @@ -325,6 +333,13 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { GridCacheContext cctx = ctx.cacheContext(cacheId); finishUnmarshalCacheObjects(keys, cctx, ldr); + + if (partIds != null && !partIds.isEmpty()) { + assert partIds.size() == keys.size(); + + for (int i = 0; i < keys.size(); i++) + keys.get(i).partition(partIds.get(i)); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 2cf7276..2bb6bf3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java index 213a0ff..7854ace 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java @@ -44,6 +44,10 @@ public class GridDistributedUnlockRequest extends GridDistributedBaseMessage { @GridDirectCollection(KeyCacheObject.class) private List<KeyCacheObject> keys; + /** Partition IDs. */ + @GridDirectCollection(int.class) + protected List<Integer> partIds; + /** * Empty constructor required by {@link Externalizable}. */ @@ -75,10 +79,13 @@ public class GridDistributedUnlockRequest extends GridDistributedBaseMessage { * @throws IgniteCheckedException If failed. */ public void addKey(KeyCacheObject key, GridCacheContext ctx) throws IgniteCheckedException { - if (keys == null) + if (keys == null) { keys = new ArrayList<>(keysCount()); + partIds = new ArrayList<>(keysCount()); + } keys.add(key); + partIds.add(key.partition()); } /** {@inheritDoc} @@ -94,6 +101,13 @@ public class GridDistributedUnlockRequest extends GridDistributedBaseMessage { super.finishUnmarshal(ctx, ldr); finishUnmarshalCacheObjects(keys, ctx.cacheContext(cacheId), ldr); + + if (partIds != null && !partIds.isEmpty()) { + assert partIds.size() == keys.size(); + + for (int i = 0; i < keys.size(); i++) + keys.get(i).partition(partIds.get(i)); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java new file mode 100644 index 0000000..ff95de9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.PartitionedReadOnlySet; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * An implementation of GridCacheConcurrentMap that will delegate all method calls to corresponding local partition. + */ +public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap { + /** Context. */ + private final GridCacheContext ctx; + + /** + * Constructor. + * @param ctx Context. + */ + public GridCachePartitionedConcurrentMap(GridCacheContext ctx) { + this.ctx = ctx; + } + + /** + * @param key Key. + * @param topVer Topology version. + * @param create Create flag. + * @return Local partition. + */ + @Nullable private GridDhtLocalPartition localPartition( + KeyCacheObject key, + AffinityTopologyVersion topVer, + boolean create + ) { + int p = key.partition(); + + if (p == -1) + p = ctx.affinity().partition(key); + + return ctx.topology().localPartition(p, topVer, create); + } + + /** {@inheritDoc} */ + @Nullable @Override public GridCacheMapEntry getEntry(KeyCacheObject key) { + GridDhtLocalPartition part = localPartition(key, AffinityTopologyVersion.NONE, false); + + if (part == null) + return null; + + return part.getEntry(key); + } + + /** {@inheritDoc} */ + @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, KeyCacheObject key, + @Nullable CacheObject val, boolean create, boolean touch) { + GridDhtLocalPartition part = localPartition(key, topVer, create); + + if (part == null) + return null; + + return part.putEntryIfObsoleteOrAbsent(topVer, key, val, create, touch); + } + + /** {@inheritDoc} */ + @Override public int size() { + int size = 0; + + for (GridDhtLocalPartition part : ctx.topology().localPartitions()) + size += part.size(); + + return size; + } + + /** {@inheritDoc} */ + @Override public int publicSize() { + int size = 0; + + for (GridDhtLocalPartition part : ctx.topology().localPartitions()) + size += part.publicSize(); + + return size; + } + + /** {@inheritDoc} */ + @Override public void incrementPublicSize(GridCacheEntryEx e) { + localPartition(e.key(), AffinityTopologyVersion.NONE, true).incrementPublicSize(e); + } + + /** {@inheritDoc} */ + @Override public void decrementPublicSize(GridCacheEntryEx e) { + localPartition(e.key(), AffinityTopologyVersion.NONE, true).decrementPublicSize(e); + } + + /** {@inheritDoc} */ + @Override public boolean removeEntry(GridCacheEntryEx entry) { + GridDhtLocalPartition part = localPartition(entry.key(), AffinityTopologyVersion.NONE, false); + + if (part == null) + return false; + + return part.removeEntry(entry); + } + + /** {@inheritDoc} */ + @Nullable @Override public GridCacheMapEntry randomEntry() { + return entries().iterator().next(); + } + + /** {@inheritDoc} */ + @Override public Set<KeyCacheObject> keySet(CacheEntryPredicate... filter) { + Collection<Set<KeyCacheObject>> sets = new ArrayList<>(); + + for (GridDhtLocalPartition partition : ctx.topology().localPartitions()) + sets.add(partition.keySet(filter)); + + return new PartitionedReadOnlySet<>(sets); + } + + /** {@inheritDoc} */ + @Override public Iterable<GridCacheMapEntry> entries(final CacheEntryPredicate... filter) { + return new Iterable<GridCacheMapEntry>() { + @Override public Iterator<GridCacheMapEntry> iterator() { + List<Iterator<GridCacheMapEntry>> iterators = new ArrayList<>(); + + for (GridDhtLocalPartition partition : ctx.topology().localPartitions()) + iterators.add(partition.entries(filter).iterator()); + + return F.flatIterators(iterators); + } + }; + } + + /** {@inheritDoc} */ + @Override public Iterable<GridCacheMapEntry> allEntries(final CacheEntryPredicate... filter) { + return new Iterable<GridCacheMapEntry>() { + @Override public Iterator<GridCacheMapEntry> iterator() { + List<Iterator<GridCacheMapEntry>> iterators = new ArrayList<>(); + + for (GridDhtLocalPartition partition : ctx.topology().localPartitions()) + iterators.add(partition.allEntries(filter).iterator()); + + return F.flatIterators(iterators); + } + }; + } + + /** {@inheritDoc} */ + @Override public Set<GridCacheMapEntry> entrySet(CacheEntryPredicate... filter) { + Collection<Set<GridCacheMapEntry>> sets = new ArrayList<>(); + + for (GridDhtLocalPartition partition : ctx.topology().localPartitions()) + sets.add(partition.entrySet(filter)); + + return new PartitionedReadOnlySet<>(sets); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCachePartitionedConcurrentMap.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 3761d77..4635cad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -357,13 +357,6 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public GridDhtLocalPartition onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry e) { - assert false : "Entry should not be added to client topology: " + e; - - return null; - } - - /** {@inheritDoc} */ @Override public void onRemoved(GridDhtCacheEntry e) { assert false : "Entry should not be removed from client topology: " + e; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index faa980e..5ff674f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -158,9 +158,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param ctx Context. */ protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) { - super(ctx, ctx.config().getStartSize()); - - top = new GridDhtPartitionTopologyImpl(ctx); + this(ctx, new GridCachePartitionedConcurrentMap(ctx)); } /** @@ -171,27 +169,13 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap */ protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) { super(ctx, map); - - top = new GridDhtPartitionTopologyImpl(ctx); } /** {@inheritDoc} */ @Override protected void init() { - map.setEntryFactory(new GridCacheMapEntryFactory() { - /** {@inheritDoc} */ - @Override public GridCacheMapEntry create( - GridCacheContext ctx, - AffinityTopologyVersion topVer, - KeyCacheObject key, - int hash, - CacheObject val - ) { - if (ctx.useOffheapEntry()) - return new GridDhtOffHeapCacheEntry(ctx, topVer, key, hash, val); + super.init(); - return new GridDhtCacheEntry(ctx, topVer, key, hash, val); - } - }); + top = new GridDhtPartitionTopologyImpl(ctx, entryFactory()); } /** {@inheritDoc} */ @@ -252,6 +236,26 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** + * @return Cache map entry factory. + */ + protected GridCacheMapEntryFactory entryFactory() { + return new GridCacheMapEntryFactory() { + @Override public GridCacheMapEntry create( + GridCacheContext ctx, + AffinityTopologyVersion topVer, + KeyCacheObject key, + int hash, + CacheObject val + ) { + if (ctx.useOffheapEntry()) + return new GridDhtOffHeapCacheEntry(ctx, topVer, key, hash, val); + + return new GridDhtCacheEntry(ctx, topVer, key, hash, val); + } + }; + } + + /** * @return Near cache. */ public abstract GridNearCacheAdapter<K, V> near(); @@ -404,7 +408,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. */ - @Override public GridCacheEntryEx entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { + @Override public GridCacheEntryEx entryEx(KeyCacheObject key, + AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { return super.entryEx(key, topVer); } @@ -423,7 +428,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @return DHT entry. * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. */ - public GridDhtCacheEntry entryExx(KeyCacheObject key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { + public GridDhtCacheEntry entryExx(KeyCacheObject key, + AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { return (GridDhtCacheEntry)entryEx(key, topVer); } @@ -640,7 +646,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Nullable IgniteCacheExpiryPolicy expiry, boolean skipVals, boolean canRemap - ) { + ) { return getAllAsync0(keys, readThrough, /*don't check local tx. */false, @@ -1084,7 +1090,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap final GridDhtLocalPartition part = ctx.topology().localPartition(partId, ctx.discovery().topologyVersionEx(), false); - Iterator<GridDhtCacheEntry> partIt = part == null ? null : part.entries().iterator(); + Iterator<GridCacheMapEntry> partIt = part == null ? null : part.entries().iterator(); return new PartitionEntryIterator(partIt); } @@ -1152,7 +1158,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** {@inheritDoc} */ - @Override public List<GridCacheClearAllRunnable<K, V>> splitClearLocally(boolean srv, boolean near, boolean readers) { + @Override public List<GridCacheClearAllRunnable<K, V>> splitClearLocally(boolean srv, boolean near, + boolean readers) { return ctx.affinityNode() ? super.splitClearLocally(srv, near, readers) : Collections.<GridCacheClearAllRunnable<K, V>>emptyList(); } @@ -1164,8 +1171,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap GridDhtLocalPartition part = topology().localPartition(entry.partition(), AffinityTopologyVersion.NONE, false); - // Do not remove entry on replica topology. Instead, add entry to removal queue. - // It will be cleared eventually. if (part != null) { try { part.onDeferredDelete(entry.key(), ver); @@ -1211,16 +1216,16 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap assert primary || backup; if (primary && backup) - return iterator(map.entries0().iterator(), !ctx.keepBinary()); + return iterator(entries().iterator(), !ctx.keepBinary()); else { final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); final Iterator<GridDhtLocalPartition> partIt = topology().currentLocalPartitions().iterator(); - Iterator<GridCacheEntryEx> it = new Iterator<GridCacheEntryEx>() { - private GridCacheEntryEx next; + Iterator<GridCacheMapEntry> it = new Iterator<GridCacheMapEntry>() { + private GridCacheMapEntry next; - private Iterator<GridDhtCacheEntry> curIt; + private Iterator<GridCacheMapEntry> curIt; { advance(); @@ -1230,11 +1235,11 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap return next != null; } - @Override public GridCacheEntryEx next() { + @Override public GridCacheMapEntry next() { if (next == null) throw new NoSuchElementException(); - GridCacheEntryEx e = next; + GridCacheMapEntry e = next; advance(); @@ -1293,12 +1298,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap private Cache.Entry<K, V> last; /** Partition iterator. */ - private final Iterator<GridDhtCacheEntry> partIt; + private final Iterator<GridCacheMapEntry> partIt; /** * @param partIt Partition iterator. */ - private PartitionEntryIterator(@Nullable Iterator<GridDhtCacheEntry> partIt) { + private PartitionEntryIterator(@Nullable Iterator<GridCacheMapEntry> partIt) { this.partIt = partIt; advance(); @@ -1335,9 +1340,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap private void advance() { if (partIt != null) { while (partIt.hasNext()) { - GridDhtCacheEntry next = partIt.next(); + GridCacheEntryEx next = partIt.next(); - if (next.isInternal() || !next.visitable(CU.empty0())) + if (next instanceof GridCacheMapEntry && (!((GridCacheMapEntry)next).visitable(CU.empty0()))) continue; entry = next.wrapLazyValue(); http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index ab51bdb..95ef10b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -88,7 +88,11 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { super(ctx, key, hash, val); // Record this entry with partition. - locPart = ctx.dht().topology().onAdded(topVer, this); + int p = cctx.affinity().partition(key); + + locPart = ctx.topology().localPartition(p, topVer, true); + + assert locPart != null; } /** {@inheritDoc} */ @@ -182,8 +186,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { boolean reenter, boolean tx, boolean implicitSingle) - throws GridCacheEntryRemovedException, GridDistributedLockCancelledException - { + throws GridCacheEntryRemovedException, GridDistributedLockCancelledException { assert serReadVer == null || serOrder != null; assert !reenter || serOrder == null; @@ -331,7 +334,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { * @throws GridCacheEntryRemovedException If entry has been removed. */ @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext"}) - @Nullable public synchronized IgniteBiTuple<GridCacheVersion, CacheObject> versionedValue(AffinityTopologyVersion topVer) + @Nullable public synchronized IgniteBiTuple<GridCacheVersion, CacheObject> versionedValue( + AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException { if (isNew() || !valid(AffinityTopologyVersion.NONE) || deletedUnlocked()) return null; @@ -599,7 +603,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { } finally { if (rmv) - cctx.cache().removeIfObsolete(key); // Clear cache. + cctx.cache().removeEntry(this); // Clear cache. } } @@ -713,6 +717,16 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { return S.toString(GridDhtCacheEntry.class, this, "super", super.toString()); } + /** {@inheritDoc} */ + @Override protected void incrementMapPublicSize() { + locPart.incrementPublicSize(this); + } + + /** {@inheritDoc} */ + @Override protected void decrementMapPublicSize() { + locPart.decrementPublicSize(this); + } + /** * Reader ID. */ @@ -793,7 +807,6 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { return txFut; } - /** {@inheritDoc} */ @Override public boolean equals(Object o) { if (this == o) http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 4fc1eaf..3ac4a41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -17,12 +17,10 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -33,8 +31,15 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; +import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; @@ -53,8 +58,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; -import org.jsr166.ConcurrentHashMap8; -import org.jsr166.LongAdder8; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED; @@ -66,7 +70,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** * Key partition. */ -public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, GridReservable { +public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, GridReservable, GridCacheConcurrentMap { /** Maximum size for delete queue. */ public static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, 200_000); @@ -89,7 +93,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, private final GridFutureAdapter<?> rent; /** Entries map. */ - private final ConcurrentMap<KeyCacheObject, GridDhtCacheEntry> map; + private final GridCacheConcurrentMap map; /** Context. */ private final GridCacheContext cctx; @@ -104,9 +108,6 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, /** Lock. */ private final ReentrantLock lock = new ReentrantLock(); - /** Public size counter. */ - private final LongAdder8 mapPubSize = new LongAdder8(); - /** Remove queue. */ private final GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue; @@ -120,8 +121,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, * @param cctx Context. * @param id Partition ID. */ - @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") - GridDhtLocalPartition(GridCacheContext cctx, int id) { + @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") GridDhtLocalPartition(GridCacheContext cctx, int id, + GridCacheMapEntryFactory entryFactory) { assert cctx != null; this.id = id; @@ -135,8 +136,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, } }; - map = new ConcurrentHashMap8<>(cctx.config().getStartSize() / - cctx.affinity().partitions()); + map = new GridCacheConcurrentMapImpl(cctx, entryFactory, cctx.config().getStartSize() / cctx.affinity().partitions()); int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 : Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20); @@ -202,45 +202,30 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, } /** - * @return Entries belonging to partition. - */ - public Collection<GridDhtCacheEntry> entries() { - return map.values(); - } - - /** * @return {@code True} if partition is empty. */ public boolean isEmpty() { - return map.isEmpty(); + return map.size() == 0; } - /** - * @return Number of entries in this partition (constant-time method). - */ - public int size() { + /** {@inheritDoc} */ + @Override public int size() { return map.size(); } - /** - * Increments public size of the map. - */ - public void incrementPublicSize() { - mapPubSize.increment(); + /** {@inheritDoc} */ + @Override public int publicSize() { + return map.publicSize(); } - /** - * Decrements public size of the map. - */ - public void decrementPublicSize() { - mapPubSize.decrement(); + /** {@inheritDoc} */ + @Override public void incrementPublicSize(GridCacheEntryEx e) { + map.incrementPublicSize(e); } - /** - * @return Number of public (non-internal) entries in this partition. - */ - public int publicSize() { - return mapPubSize.intValue(); + /** {@inheritDoc} */ + @Override public void decrementPublicSize(GridCacheEntryEx e) { + map.decrementPublicSize(e); } /** @@ -252,39 +237,57 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, return state == MOVING || state == OWNING || state == RENTING; } - /** - * @param entry Entry to add. - */ - void onAdded(GridDhtCacheEntry entry) { - GridDhtPartitionState state = state(); + /** {@inheritDoc} */ + @Override @Nullable public GridCacheMapEntry getEntry(KeyCacheObject key) { + return map.getEntry(key); + } - if (state == EVICTED) - throw new GridDhtInvalidPartitionException(id, "Adding entry to invalid partition " + - "(often may be caused by inconsistent 'key.hashCode()' implementation) [part=" + id + ']'); + /** {@inheritDoc} */ + @Override public boolean removeEntry(GridCacheEntryEx entry) { + return map.removeEntry(entry); + } - map.put(entry.key(), entry); + /** {@inheritDoc} */ + @Override public Iterable<GridCacheMapEntry> entries( + CacheEntryPredicate... filter) { + return map.entries(filter); + } - if (!entry.isInternal()) { - assert !entry.deleted() : entry; + /** {@inheritDoc} */ + @Override public Iterable<GridCacheMapEntry> allEntries(CacheEntryPredicate... filter) { + return map.allEntries(filter); + } - mapPubSize.increment(); - } + /** {@inheritDoc} */ + @Override public Set<GridCacheMapEntry> entrySet(CacheEntryPredicate... filter) { + return map.entrySet(filter); + } + + /** {@inheritDoc} */ + @Override @Nullable public GridCacheMapEntry randomEntry() { + return map.randomEntry(); + } + + /** {@inheritDoc} */ + @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent( + AffinityTopologyVersion topVer, KeyCacheObject key, + @Nullable CacheObject val, boolean create, boolean touch) { + return map.putEntryIfObsoleteOrAbsent(topVer, key, val, create, touch); + } + + /** {@inheritDoc} */ + @Override public Set<KeyCacheObject> keySet(CacheEntryPredicate... filter) { + return map.keySet(filter); } /** * @param entry Entry to remove. */ - @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") void onRemoved(GridDhtCacheEntry entry) { assert entry.obsolete() : entry; // Make sure to remove exactly this entry. - synchronized (entry) { - map.remove(entry.key(), entry); - - if (!entry.isInternal() && !entry.deleted()) - mapPubSize.decrement(); - } + map.removeEntry(entry); // Attempt to evict. tryEvict(); @@ -312,7 +315,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, /** * Locks partition. */ - @SuppressWarnings( {"LockAcquiredButNotSafelyReleased"}) + @SuppressWarnings({"LockAcquiredButNotSafelyReleased"}) public void lock() { lock.lock(); } @@ -338,11 +341,11 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, Map<KeyCacheObject, GridCacheVersion> evictHist0 = evictHist; - if (evictHist0 != null ) { + if (evictHist0 != null) { GridCacheVersion ver0 = evictHist0.get(key); if (ver0 == null || ver0.isLess(ver)) { - GridCacheVersion ver1 = evictHist0.put(key, ver); + GridCacheVersion ver1 = evictHist0.put(key, ver); assert ver1 == ver0; } @@ -366,7 +369,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, Map<KeyCacheObject, GridCacheVersion> evictHist0 = evictHist; - if (evictHist0 != null) { + if (evictHist0 != null) { GridCacheVersion ver0 = evictHist0.get(key); // Permit preloading if version in history @@ -489,7 +492,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, int ord = (int)(reservations >> 32); - if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) && + if (isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) && ord == RENTING.ordinal() && (reservations & 0xFFFF) == 0 && casState(reservations, EVICTED)) { if (log.isDebugEnabled()) @@ -538,7 +541,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, // Attempt to evict partition entries from cache. clearAll(); - if (map.isEmpty() && casState(reservations, EVICTED)) { + if (isEmpty() && casState(reservations, EVICTED)) { if (log.isDebugEnabled()) log.debug("Evicted partition: " + this); @@ -655,7 +658,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, boolean rec = cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED); - Iterator<GridDhtCacheEntry> it = map.values().iterator(); + Iterator<GridDhtCacheEntry> it = (Iterator)map.allEntries().iterator(); GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> swapIt = null; @@ -684,11 +687,9 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, cached = it.next(); if (cached.clearInternal(clearVer, swap, extras)) { - map.remove(cached.key(), cached); + map.removeEntry(cached); if (!cached.isInternal()) { - mapPubSize.decrement(); - if (rec) { cctx.events().addEvent(cached.partition(), cached.key(), @@ -709,7 +710,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, } } catch (GridDhtInvalidPartitionException e) { - assert map.isEmpty() && state() == EVICTED: "Invalid error [e=" + e + ", part=" + this + ']'; + assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']'; assert swapEmpty() : "Invalid error when swap is not cleared [e=" + e + ", part=" + this + ']'; break; // Partition is already concurrently cleared and evicted. @@ -789,7 +790,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, } @Override public void remove() { - map.remove(lastEntry.key(), lastEntry); + map.removeEntry(lastEntry); } }; } @@ -811,7 +812,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, } /** {@inheritDoc} */ - @SuppressWarnings( {"OverlyStrongTypeCast"}) + @SuppressWarnings({"OverlyStrongTypeCast"}) @Override public boolean equals(Object obj) { return obj instanceof GridDhtLocalPartition && (obj == this || ((GridDhtLocalPartition)obj).id() == id); } @@ -829,8 +830,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, return S.toString(GridDhtLocalPartition.class, this, "state", state(), "reservations", reservations(), - "empty", map.isEmpty(), - "createTime", U.format(createTime), - "mapPubSize", mapPubSize); + "empty", isEmpty(), + "createTime", U.format(createTime)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java index 50167d8..95c6dfc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java @@ -419,6 +419,12 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { writer.incrementState(); + case 30: + if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + } return true; @@ -515,6 +521,14 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { reader.incrementState(); + case 30: + partIds = reader.readCollection("partIds", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridDhtLockRequest.class); @@ -527,7 +541,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 30; + return 31; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 7fba45d..2c483a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -195,13 +195,6 @@ public interface GridDhtPartitionTopology { public GridDhtPartitionFullMap partitionMap(boolean onlyActive); /** - * @param topVer Topology version. - * @param e Entry added to cache. - * @return Local partition. - */ - public GridDhtLocalPartition onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry e); - - /** * @param e Entry removed from cache. */ public void onRemoved(GridDhtCacheEntry e); http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index f0ce6d1..9f5fbfb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; @@ -37,6 +36,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; @@ -50,7 +50,6 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; @@ -76,8 +75,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private final IgniteLogger log; /** */ - private final ConcurrentMap<Integer, GridDhtLocalPartition> locParts = - new ConcurrentHashMap8<>(); + private final GridDhtLocalPartition[] locParts; /** Node to partition map. */ private GridDhtPartitionFullMap node2part; @@ -103,6 +101,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** Lock. */ private final StripedCompositeReadWriteLock lock = new StripedCompositeReadWriteLock(16); + /** */ + private final GridCacheMapEntryFactory entryFactory; + /** Partition update counter. */ private Map<Integer, Long> cntrMap = new HashMap<>(); @@ -112,12 +113,15 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** * @param cctx Context. */ - GridDhtPartitionTopologyImpl(GridCacheContext<?, ?> cctx) { + GridDhtPartitionTopologyImpl(GridCacheContext<?, ?> cctx, GridCacheMapEntryFactory entryFactory) { assert cctx != null; this.cctx = cctx; + this.entryFactory = entryFactory; log = cctx.logger(getClass()); + + locParts = new GridDhtLocalPartition[cctx.config().getAffinity().partitions()]; } /** @@ -149,7 +153,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** * @return Full map string representation. */ - @SuppressWarnings( {"ConstantConditions"}) + @SuppressWarnings({"ConstantConditions"}) private String fullMapString() { return node2part == null ? "null" : FULL_MAP_DEBUG ? node2part.toFullString() : node2part.toString(); } @@ -158,7 +162,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param map Map to get string for. * @return Full map string representation. */ - @SuppressWarnings( {"ConstantConditions"}) + @SuppressWarnings({"ConstantConditions"}) private String mapString(GridDhtPartitionMap2 map) { return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : map.toString(); } @@ -172,34 +176,65 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private boolean waitForRent() throws IgniteCheckedException { boolean changed = false; - // Synchronously wait for all renting partitions to complete. - for (Iterator<GridDhtLocalPartition> it = locParts.values().iterator(); it.hasNext();) { - GridDhtLocalPartition p = it.next(); + GridDhtLocalPartition[] locPartsCopy = new GridDhtLocalPartition[locParts.length]; - GridDhtPartitionState state = p.state(); + lock.readLock().lock(); + + try { + for (int i = 0; i < locParts.length; i++) + locPartsCopy[i] = locParts[i]; + } + finally { + lock.readLock().unlock(); + } + + GridDhtLocalPartition part; + + for (int i = 0; i < locPartsCopy.length; i++) { + part = locPartsCopy[i]; + + if (part == null) + continue; + + GridDhtPartitionState state = part.state(); if (state == RENTING || state == EVICTED) { if (log.isDebugEnabled()) - log.debug("Waiting for renting partition: " + p); + log.debug("Waiting for renting partition: " + part); // Wait for partition to empty out. - p.rent(true).get(); + part.rent(true).get(); if (log.isDebugEnabled()) - log.debug("Finished waiting for renting partition: " + p); + log.debug("Finished waiting for renting partition: " + part); + } + } - // Remove evicted partition. - it.remove(); + // Remove evicted partition. + lock.writeLock().lock(); - changed = true; + try { + for (int i = 0; i < locParts.length; i++) { + part = locParts[i]; + + if (part == null) + continue; + + if (part.state() == EVICTED) { + locParts[i] = null; + changed = true; + } } } + finally { + lock.writeLock().unlock(); + } return changed; } /** {@inheritDoc} */ - @SuppressWarnings( {"LockAcquiredButNotSafelyReleased"}) + @SuppressWarnings({"LockAcquiredButNotSafelyReleased"}) @Override public void readLock() { lock.readLock().lock(); } @@ -267,7 +302,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public void initPartitions(GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException { + @Override public void initPartitions( + GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException { U.writeLock(lock); try { @@ -300,12 +336,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert topVer.equals(exchFut.topologyVersion()) : "Invalid topology [topVer=" + topVer + - ", cache=" + cctx.name() + - ", futVer=" + exchFut.topologyVersion() + ']'; + ", cache=" + cctx.name() + + ", futVer=" + exchFut.topologyVersion() + ']'; assert cctx.affinity().affinityTopologyVersion().equals(exchFut.topologyVersion()) : "Invalid affinity [topVer=" + cctx.affinity().affinityTopologyVersion() + - ", cache=" + cctx.name()+ - ", futVer=" + exchFut.topologyVersion() + ']'; + ", cache=" + cctx.name() + + ", futVer=" + exchFut.topologyVersion() + ']'; List<List<ClusterNode>> aff = cctx.affinity().assignments(exchFut.topologyVersion()); @@ -587,7 +623,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create) + @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, + boolean create) throws GridDhtInvalidPartitionException { return localPartition(p, topVer, create, true); } @@ -597,21 +634,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @return Partition. */ private GridDhtLocalPartition createPartition(int p) { - GridDhtLocalPartition loc = locParts.get(p); - - if (loc != null && loc.state() == EVICTED) { - boolean rmv = locParts.remove(p, loc); + assert lock.isWriteLockedByCurrentThread(); - assert rmv; - - loc = null; - } + GridDhtLocalPartition loc = locParts[p]; - if (loc == null) { - GridDhtLocalPartition old = locParts.putIfAbsent(p, loc = new GridDhtLocalPartition(cctx, p)); - - assert old == null : old; - } + if (loc == null || loc.state() == EVICTED) + locParts[p] = loc = new GridDhtLocalPartition(cctx, p, entryFactory); return loc; } @@ -627,54 +655,59 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { AffinityTopologyVersion topVer, boolean create, boolean updateSeq) { - boolean belongs = create && cctx.affinity().localNode(p, topVer); + GridDhtLocalPartition loc; - while (true) { - GridDhtLocalPartition loc = locParts.get(p); + lock.readLock().lock(); - if (loc != null && loc.state() == EVICTED) { - locParts.remove(p, loc); + try { + loc = locParts[p]; + } + finally { + lock.readLock().unlock(); + } + + if (loc != null && loc.state() != EVICTED) + return loc; + + if (!create) + return null; + + lock.writeLock().lock(); + + try { + loc = locParts[p]; + + boolean belongs = cctx.affinity().localNode(p, topVer); - if (!create) - return null; + if (loc != null && loc.state() == EVICTED) { + locParts[p] = loc = null; if (!belongs) throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " + "(often may be caused by inconsistent 'key.hashCode()' implementation) " + "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); - - continue; } - if (loc == null && create) { + if (loc == null) { if (!belongs) throw new GridDhtInvalidPartitionException(p, "Creating partition which does not belong to " + "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " + "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); - lock.writeLock().lock(); - - try { - GridDhtLocalPartition old = locParts.putIfAbsent(p, - loc = new GridDhtLocalPartition(cctx, p)); + locParts[p] = loc = new GridDhtLocalPartition(cctx, p, entryFactory); - if (old != null) - loc = old; - else { - if (updateSeq) - this.updateSeq.incrementAndGet(); + if (updateSeq) + this.updateSeq.incrementAndGet(); - if (log.isDebugEnabled()) - log.debug("Created local partition: " + loc); - } - } - finally { - lock.writeLock().unlock(); - } + if (log.isDebugEnabled()) + log.debug("Created local partition: " + loc); } - - return loc; } + finally { + lock.writeLock().unlock(); + } + + return loc; } /** {@inheritDoc} */ @@ -682,8 +715,20 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert parts != null; assert parts.length > 0; + GridDhtLocalPartition[] locPartsCopy = new GridDhtLocalPartition[parts.length]; + + lock.readLock().lock(); + + try { + for (int i = 0; i < parts.length; i++) + locPartsCopy[i] = locParts[parts[i]]; + } + finally { + lock.readLock().unlock(); + } + for (int i = 0; i < parts.length; i++) - locParts.get(parts[i]).release(); + locPartsCopy[i].release(); } /** {@inheritDoc} */ @@ -693,31 +738,28 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public List<GridDhtLocalPartition> localPartitions() { - return new LinkedList<>(locParts.values()); - } - - /** {@inheritDoc} */ - @Override public Collection<GridDhtLocalPartition> currentLocalPartitions() { - return locParts.values(); - } - - /** {@inheritDoc} */ - @Override public GridDhtLocalPartition onAdded(AffinityTopologyVersion topVer, GridDhtCacheEntry e) { - /* - * Make sure not to acquire any locks here as this method - * may be called from sensitive synchronization blocks. - * =================================================== - */ + LinkedList<GridDhtLocalPartition> list = new LinkedList<>(); - int p = cctx.affinity().partition(e.key()); + lock.readLock().lock(); - GridDhtLocalPartition loc = localPartition(p, topVer, true); + try { + for (int i = 0; i < locParts.length; i++) { + GridDhtLocalPartition part = locParts[i]; - assert loc != null; + if (part != null) + list.add(part); + } - loc.onAdded(e); + return list; + } + finally { + lock.readLock().unlock(); + } + } - return loc; + /** {@inheritDoc} */ + @Override public Collection<GridDhtLocalPartition> currentLocalPartitions() { + return localPartitions(); } /** {@inheritDoc} */ @@ -736,11 +778,22 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public GridDhtPartitionMap2 localPartitionMap() { + Map<Integer, GridDhtPartitionState> map = new HashMap<>(); + lock.readLock().lock(); try { + for (int i = 0; i < locParts.length; i++) { + GridDhtLocalPartition part = locParts[i]; + + if (part == null) + continue; + + map.put(i, part.state()); + } + return new GridDhtPartitionMap2(cctx.nodeId(), updateSeq.get(), topVer, - F.viewReadOnly(locParts, CU.part2state()), true); + Collections.unmodifiableMap(map), true); } finally { lock.readLock().unlock(); @@ -943,7 +996,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { this.cntrMap.put(e.getKey(), e.getValue()); } - for (GridDhtLocalPartition part : locParts.values()) { + for (int i = 0; i < locParts.length; i++) { + GridDhtLocalPartition part = locParts[i]; + + if (part == null) + continue; + Long cntr = cntrMap.get(part.id()); if (cntr != null) @@ -992,7 +1050,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext();) { + for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) { UUID nodeId = it.next(); if (!cctx.discovery().alive(nodeId)) { @@ -1078,7 +1136,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { this.cntrMap.put(e.getKey(), e.getValue()); } - for (GridDhtLocalPartition part : locParts.values()) { + for (int i = 0; i < locParts.length; i++) { + GridDhtLocalPartition part = locParts[i]; + + if (part == null) + continue; + Long cntr = cntrMap.get(part.id()); if (cntr != null) @@ -1178,12 +1241,15 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { UUID locId = cctx.nodeId(); - for (GridDhtLocalPartition part : locParts.values()) { + for (int p = 0; p < locParts.length; p++) { + GridDhtLocalPartition part = locParts[p]; + + if (part == null) + continue; + GridDhtPartitionState state = part.state(); if (state.active()) { - int p = part.id(); - List<ClusterNode> affNodes = aff.get(p); if (!affNodes.contains(cctx.localNode())) { @@ -1403,7 +1469,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { Map<Integer, Long> res = new HashMap<>(cntrMap); - for (GridDhtLocalPartition part : locParts.values()) { + for (int i = 0; i < locParts.length; i++) { + GridDhtLocalPartition part = locParts[i]; + + if (part == null) + continue; + Long cntr0 = res.get(part.id()); Long cntr1 = part.updateCounter(); @@ -1429,11 +1500,23 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { @Override public void printMemoryStats(int threshold) { X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']'); - for (GridDhtLocalPartition part : locParts.values()) { - int size = part.size(); + lock.readLock().lock(); + + try { + for (int i = 0; i < locParts.length; i++) { + GridDhtLocalPartition part = locParts[i]; + + if (part == null) + continue; + + int size = part.size(); - if (size >= threshold) - X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']'); + if (size >= threshold) + X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']'); + } + } + finally { + lock.readLock().unlock(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0b7470b3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java index 38152a7..1d067da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java @@ -120,6 +120,11 @@ public class GridDhtUnlockRequest extends GridDistributedUnlockRequest { writer.incrementState(); + case 9: + if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); } return true; @@ -144,6 +149,14 @@ public class GridDhtUnlockRequest extends GridDistributedUnlockRequest { reader.incrementState(); + case 9: + partIds = reader.readCollection("partIds", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridDhtUnlockRequest.class); @@ -156,6 +169,6 @@ public class GridDhtUnlockRequest extends GridDistributedUnlockRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 9; + return 10; } }
