Repository: ignite Updated Branches: refs/heads/ignite-2264 [created] d35afa83b
IGNITE-2264 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d35afa83 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d35afa83 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d35afa83 Branch: refs/heads/ignite-2264 Commit: d35afa83b5536dec9b8b8c9078eb4240702f9b93 Parents: 18c413c Author: vozerov-gridgain <[email protected]> Authored: Sat Dec 26 16:28:12 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Sat Dec 26 16:28:42 2015 +0300 ---------------------------------------------------------------------- .../distributed/GridDistributedTxMapping.java | 120 ++++++++++++++++--- .../distributed/dht/GridDhtTxPrepareFuture.java | 31 +---- .../ignite/internal/util/SingletonSet.java | 98 +++++++++++++++ 3 files changed, 205 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d35afa83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java index 8c9f181..52bfcb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java @@ -22,12 +22,16 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashSet; +import java.util.Map; + import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.SingletonSet; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; @@ -40,6 +44,9 @@ import org.jetbrains.annotations.Nullable; * Transaction node mapping. */ public class GridDistributedTxMapping implements Externalizable { + /** Empty entries. */ + private static final Collection<IgniteTxEntry> EMPTY = Collections.emptySet(); + /** */ private static final long serialVersionUID = 0L; @@ -79,7 +86,7 @@ public class GridDistributedTxMapping implements Externalizable { public GridDistributedTxMapping(ClusterNode node) { this.node = node; - entries = new LinkedHashSet<>(); + entries = EMPTY; } /** @@ -188,6 +195,19 @@ public class GridDistributedTxMapping implements Externalizable { * @param entry Adds entry. */ public void add(IgniteTxEntry entry) { + if (entries == EMPTY) { + entries = SingletonSet.create(entry); + + return; + } + else if (entries instanceof SingletonSet) { + Collection<IgniteTxEntry> entries0 = new LinkedHashSet<>(); + + entries0.add(((SingletonSet<IgniteTxEntry>)entries).element()); + + entries = entries0; + } + entries.add(entry); } @@ -196,32 +216,89 @@ public class GridDistributedTxMapping implements Externalizable { * @return {@code True} if entry was removed. */ public boolean removeEntry(IgniteTxEntry entry) { - return entries.remove(entry); + if (entries != EMPTY) { + if (F.eq(entry, ((SingletonSet<IgniteTxEntry>)entries).element())) { + entries = EMPTY; + + return true; + } + else + return entries.remove(entry); + } + + return false; } /** - * @param keys Keys to evict readers for. + * Remove invalid partitions. + * + * @param invalidParts Invalid partitions. */ - public void evictReaders(@Nullable Collection<IgniteTxKey> keys) { - if (keys == null || keys.isEmpty()) - return; + public void removeInvalidPartitions(Collection<Integer> invalidParts) { + if (entries != EMPTY) { + if (entries instanceof SingletonSet) { + if (invalidParts.contains(((SingletonSet<IgniteTxEntry>)entries).element().cached().partition())) + entries = EMPTY; + } + else { + for (Iterator<IgniteTxEntry> it = entries.iterator(); it.hasNext();) { + IgniteTxEntry entry = it.next(); + + if (invalidParts.contains(entry.cached().partition())) + it.remove(); + } + } + } + } - evictReaders(keys, entries); + /** + * Remove invalid partitions by cache ID. + * + * @param invalidPartsMap Invalid partitions map. + */ + public void removeInvalidPartitionsByCacheId(Map<Integer, int[]> invalidPartsMap) { + if (entries != EMPTY) { + if (entries instanceof SingletonSet) { + IgniteTxEntry entry = ((SingletonSet<IgniteTxEntry>)entries).element(); + + int[] invalidParts = invalidPartsMap.get(entry.cacheId()); + + if (invalidParts != null && F.contains(invalidParts, entry.cached().partition())) + entries = EMPTY; + } + else { + for (Iterator<IgniteTxEntry> it = entries.iterator(); it.hasNext();) { + IgniteTxEntry entry = it.next(); + + int[] invalidParts = invalidPartsMap.get(entry.cacheId()); + + if (invalidParts != null && F.contains(invalidParts, entry.cached().partition())) + it.remove(); + } + } + } } /** * @param keys Keys to evict readers for. - * @param entries Entries to check. */ - private void evictReaders(Collection<IgniteTxKey> keys, @Nullable Collection<IgniteTxEntry> entries) { - if (entries == null || entries.isEmpty()) + public void evictReaders(@Nullable Collection<IgniteTxKey> keys) { + if (keys == null || keys.isEmpty()) return; - for (Iterator<IgniteTxEntry> it = entries.iterator(); it.hasNext();) { - IgniteTxEntry entry = it.next(); - - if (keys.contains(entry.txKey())) - it.remove(); + if (entries != EMPTY) { + if (entries instanceof SingletonSet) { + if (keys.contains(((SingletonSet<IgniteTxEntry>)entries).element().txKey())) + entries = EMPTY; + } + else { + for (Iterator<IgniteTxEntry> it = entries.iterator(); it.hasNext(); ) { + IgniteTxEntry entry = it.next(); + + if (keys.contains(entry.txKey())) + it.remove(); + } + } } } @@ -245,7 +322,18 @@ public class GridDistributedTxMapping implements Externalizable { @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { node = (ClusterNode)in.readObject(); - entries = U.readCollection(in); + int size = in.readInt(); + + if (size <= 0) + entries = EMPTY; + else if (size == 1) + entries = SingletonSet.create((IgniteTxEntry)in.readObject()); + else { + entries = U.newLinkedHashSet(size); + + for (int i = 0; i < size; i++) + entries.add((IgniteTxEntry)in.readObject()); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d35afa83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 40399b4..903822c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1502,37 +1502,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter // Process invalid partitions (no need to remap). // Keep this loop for backward compatibility. - if (!F.isEmpty(res.invalidPartitions())) { - for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) { - IgniteTxEntry entry = it.next(); - - if (res.invalidPartitions().contains(entry.cached().partition())) { - it.remove(); - - if (log.isDebugEnabled()) - log.debug("Removed mapping for entry from dht mapping [key=" + entry.key() + - ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']'); - } - } - } + if (!F.isEmpty(res.invalidPartitions())) + dhtMapping.removeInvalidPartitions(res.invalidPartitions()); // Process invalid partitions (no need to remap). if (!F.isEmpty(res.invalidPartitionsByCacheId())) { - Map<Integer, int[]> invalidPartsMap = res.invalidPartitionsByCacheId(); - - for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) { - IgniteTxEntry entry = it.next(); - - int[] invalidParts = invalidPartsMap.get(entry.cacheId()); - - if (invalidParts != null && F.contains(invalidParts, entry.cached().partition())) { - it.remove(); - - if (log.isDebugEnabled()) - log.debug("Removed mapping for entry from dht mapping [key=" + entry.key() + - ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']'); - } - } + dhtMapping.removeInvalidPartitionsByCacheId(res.invalidPartitionsByCacheId()); if (dhtMapping.empty()) { dhtMap.remove(nodeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/d35afa83/modules/core/src/main/java/org/apache/ignite/internal/util/SingletonSet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/SingletonSet.java b/modules/core/src/main/java/org/apache/ignite/internal/util/SingletonSet.java new file mode 100644 index 0000000..3680122 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/SingletonSet.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.NotNull; + +import java.util.AbstractSet; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Singleton set. + */ +public class SingletonSet<E> extends AbstractSet<E> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Factory method. + * + * @param elem Element. + * @return Singleton set. + */ + public static <T> SingletonSet<T> create(T elem) { + return new SingletonSet<>(elem); + } + + /** Element. */ + private final E elem; + + /** + * Constructor. + * @param elem Element. + */ + SingletonSet(E elem) { + this.elem = elem; + } + + /** + * Get element. + * + * @return Element. + */ + public E element() { + return elem; + } + + /** {@inheritDoc} */ + @NotNull public Iterator<E> iterator() { + return new Iterator<E>() { + private boolean hasNext = true; + + @Override public boolean hasNext() { + return hasNext; + } + + @Override public E next() { + if (hasNext) { + hasNext = false; + + return elem; + } + else + throw new NoSuchElementException(); + } + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + /** {@inheritDoc} */ + public int size() { + return 1; + } + + /** {@inheritDoc} */ + public boolean contains(Object o) { + return F.eq(o, elem); + } +}
