IGNITE-2264: Reverted previous changes, added LeanSet.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9533a23e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9533a23e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9533a23e Branch: refs/heads/ignite-2264 Commit: 9533a23e4641ea7f58645b89f425b73aab93ee41 Parents: 3ce735b Author: vozerov-gridgain <voze...@gridgain.com> Authored: Sun Jan 3 19:32:07 2016 +0400 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Sun Jan 3 19:32:07 2016 +0400 ---------------------------------------------------------------------- .../distributed/GridDistributedTxMapping.java | 120 +---- .../distributed/dht/GridDhtTxPrepareFuture.java | 49 +- .../apache/ignite/internal/util/LeanSet.java | 453 +++++++++++++++++++ .../ignite/internal/util/SingletonSet.java | 98 ---- 4 files changed, 506 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9533a23e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java index 52bfcb2..8c9f181 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java @@ -22,16 +22,12 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Collection; -import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashSet; -import java.util.Map; - import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.SingletonSet; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; @@ -44,9 +40,6 @@ import org.jetbrains.annotations.Nullable; * Transaction node mapping. */ public class GridDistributedTxMapping implements Externalizable { - /** Empty entries. */ - private static final Collection<IgniteTxEntry> EMPTY = Collections.emptySet(); - /** */ private static final long serialVersionUID = 0L; @@ -86,7 +79,7 @@ public class GridDistributedTxMapping implements Externalizable { public GridDistributedTxMapping(ClusterNode node) { this.node = node; - entries = EMPTY; + entries = new LinkedHashSet<>(); } /** @@ -195,19 +188,6 @@ public class GridDistributedTxMapping implements Externalizable { * @param entry Adds entry. */ public void add(IgniteTxEntry entry) { - if (entries == EMPTY) { - entries = SingletonSet.create(entry); - - return; - } - else if (entries instanceof SingletonSet) { - Collection<IgniteTxEntry> entries0 = new LinkedHashSet<>(); - - entries0.add(((SingletonSet<IgniteTxEntry>)entries).element()); - - entries = entries0; - } - entries.add(entry); } @@ -216,89 +196,32 @@ public class GridDistributedTxMapping implements Externalizable { * @return {@code True} if entry was removed. */ public boolean removeEntry(IgniteTxEntry entry) { - if (entries != EMPTY) { - if (F.eq(entry, ((SingletonSet<IgniteTxEntry>)entries).element())) { - entries = EMPTY; - - return true; - } - else - return entries.remove(entry); - } - - return false; + return entries.remove(entry); } /** - * Remove invalid partitions. - * - * @param invalidParts Invalid partitions. + * @param keys Keys to evict readers for. */ - public void removeInvalidPartitions(Collection<Integer> invalidParts) { - if (entries != EMPTY) { - if (entries instanceof SingletonSet) { - if (invalidParts.contains(((SingletonSet<IgniteTxEntry>)entries).element().cached().partition())) - entries = EMPTY; - } - else { - for (Iterator<IgniteTxEntry> it = entries.iterator(); it.hasNext();) { - IgniteTxEntry entry = it.next(); - - if (invalidParts.contains(entry.cached().partition())) - it.remove(); - } - } - } - } + public void evictReaders(@Nullable Collection<IgniteTxKey> keys) { + if (keys == null || keys.isEmpty()) + return; - /** - * Remove invalid partitions by cache ID. - * - * @param invalidPartsMap Invalid partitions map. - */ - public void removeInvalidPartitionsByCacheId(Map<Integer, int[]> invalidPartsMap) { - if (entries != EMPTY) { - if (entries instanceof SingletonSet) { - IgniteTxEntry entry = ((SingletonSet<IgniteTxEntry>)entries).element(); - - int[] invalidParts = invalidPartsMap.get(entry.cacheId()); - - if (invalidParts != null && F.contains(invalidParts, entry.cached().partition())) - entries = EMPTY; - } - else { - for (Iterator<IgniteTxEntry> it = entries.iterator(); it.hasNext();) { - IgniteTxEntry entry = it.next(); - - int[] invalidParts = invalidPartsMap.get(entry.cacheId()); - - if (invalidParts != null && F.contains(invalidParts, entry.cached().partition())) - it.remove(); - } - } - } + evictReaders(keys, entries); } /** * @param keys Keys to evict readers for. + * @param entries Entries to check. */ - public void evictReaders(@Nullable Collection<IgniteTxKey> keys) { - if (keys == null || keys.isEmpty()) + private void evictReaders(Collection<IgniteTxKey> keys, @Nullable Collection<IgniteTxEntry> entries) { + if (entries == null || entries.isEmpty()) return; - if (entries != EMPTY) { - if (entries instanceof SingletonSet) { - if (keys.contains(((SingletonSet<IgniteTxEntry>)entries).element().txKey())) - entries = EMPTY; - } - else { - for (Iterator<IgniteTxEntry> it = entries.iterator(); it.hasNext(); ) { - IgniteTxEntry entry = it.next(); - - if (keys.contains(entry.txKey())) - it.remove(); - } - } + for (Iterator<IgniteTxEntry> it = entries.iterator(); it.hasNext();) { + IgniteTxEntry entry = it.next(); + + if (keys.contains(entry.txKey())) + it.remove(); } } @@ -322,18 +245,7 @@ public class GridDistributedTxMapping implements Externalizable { @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { node = (ClusterNode)in.readObject(); - int size = in.readInt(); - - if (size <= 0) - entries = EMPTY; - else if (size == 1) - entries = SingletonSet.create((IgniteTxEntry)in.readObject()); - else { - entries = U.newLinkedHashSet(size); - - for (int i = 0; i < size; i++) - entries.add((IgniteTxEntry)in.readObject()); - } + entries = U.readCollection(in); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9533a23e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index c08d72e..ebfb868 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -372,11 +372,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter boolean modified = false; - for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { - CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>( - txEntry.context(), key, val, txEntry.cached().version(), txEntry.keepBinary()); + for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { + CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>( + txEntry.context(), key, val, txEntry.cached().version(), txEntry.keepBinary()); - try { + try { EntryProcessor<Object, Object, Object> processor = t.get1(); procRes = processor.process(invokeEntry, t.get2()); @@ -389,7 +389,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter break; } - modified |= invokeEntry.modified(); + modified |= invokeEntry.modified(); } if (modified) @@ -1213,10 +1213,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (entry.explicitVersion() == null) { GridCacheMvccCandidate added = entry.cached().candidate(version()); - assert added != null : "Null candidate for non-group-lock entry " + - "[added=" + added + ", entry=" + entry + ']'; - assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" + - "[added=" + added + ", entry=" + entry + ']'; + assert added != null : "Null candidate for non-group-lock entry " + + "[added=" + added + ", entry=" + entry + ']'; + assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" + + "[added=" + added + ", entry=" + entry + ']'; if (added != null && added.ownerVersion() != null) req.owned(entry.txKey(), added.ownerVersion()); @@ -1501,12 +1501,37 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter // Process invalid partitions (no need to remap). // Keep this loop for backward compatibility. - if (!F.isEmpty(res.invalidPartitions())) - dhtMapping.removeInvalidPartitions(res.invalidPartitions()); + if (!F.isEmpty(res.invalidPartitions())) { + for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) { + IgniteTxEntry entry = it.next(); + + if (res.invalidPartitions().contains(entry.cached().partition())) { + it.remove(); + + if (log.isDebugEnabled()) + log.debug("Removed mapping for entry from dht mapping [key=" + entry.key() + + ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']'); + } + } + } // Process invalid partitions (no need to remap). if (!F.isEmpty(res.invalidPartitionsByCacheId())) { - dhtMapping.removeInvalidPartitionsByCacheId(res.invalidPartitionsByCacheId()); + Map<Integer, int[]> invalidPartsMap = res.invalidPartitionsByCacheId(); + + for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) { + IgniteTxEntry entry = it.next(); + + int[] invalidParts = invalidPartsMap.get(entry.cacheId()); + + if (invalidParts != null && F.contains(invalidParts, entry.cached().partition())) { + it.remove(); + + if (log.isDebugEnabled()) + log.debug("Removed mapping for entry from dht mapping [key=" + entry.key() + + ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']'); + } + } if (dhtMapping.empty()) { dhtMap.remove(nodeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/9533a23e/modules/core/src/main/java/org/apache/ignite/internal/util/LeanSet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/LeanSet.java b/modules/core/src/main/java/org/apache/ignite/internal/util/LeanSet.java new file mode 100644 index 0000000..ae08736 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/LeanSet.java @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.NotNull; + +import java.util.Collection; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; + +/** + * Lean set which contains up to a single element in a field. + */ +public class LeanSet<T> implements Set<T> { + /** Hash set factory. */ + private static Factory HASH_SET_FACTORY = new HashSetFactory(); + + /** Linked hash set factory. */ + private static Factory LINKED_HASH_SET_FACTORY = new LinkedHashSetFactory(); + + /** + * Get hash set factory. + * + * @return Hash set factory. + */ + @SuppressWarnings("unchecked") + public static <T> Factory<T> hashSetFactory() { + return HASH_SET_FACTORY; + } + + /** + * Get linked hash set factory. + * + * @return Linked hash set factory. + */ + @SuppressWarnings("unchecked") + public static <T> Factory<T> linkedHashSetFactory() { + return LINKED_HASH_SET_FACTORY; + } + + /** Set factory. */ + private final Factory<T> factory; + + /** Target. */ + private Object target; + + /** Current state.*/ + private State state; + + /** + * Default constructor. Falls-back to HashSet if needed. + */ + @SuppressWarnings("unchecked") + public LeanSet() { + this((Factory<T>)hashSetFactory()); + } + + /** + * Constructor. + * + * @param factory Factory. + */ + public LeanSet(Factory<T> factory) { + A.notNull(factory, "factory"); + + this.factory = factory; + } + + /** {@inheritDoc} */ + @Override public int size() { + switch (state) { + case EMPTY: + return 0; + + case SINGLE: + return 1; + + default: + return asSet().size(); + } + } + + /** {@inheritDoc} */ + @Override public boolean isEmpty() { + switch (state) { + case EMPTY: + return true; + + case SINGLE: + return false; + + default: + return asSet().isEmpty(); + } + } + + /** {@inheritDoc} */ + @Override public boolean contains(Object o) { + switch (state) { + case EMPTY: + return false; + + case SINGLE: + return F.eq(o, target); + + default: + return asSet().contains(o); + } + } + + /** {@inheritDoc} */ + @NotNull @Override public Iterator<T> iterator() { + switch (state) { + case EMPTY: + return F.emptyIterator(); + + case SINGLE: + return new SingletonIterator(); + + default: + return asSet().iterator(); + } + } + + /** {@inheritDoc} */ + @NotNull @Override public Object[] toArray() { + switch (state) { + case EMPTY: + return new Object[0]; + + case SINGLE: + return new Object[] { target }; + + default: + return asSet().toArray(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"SuspiciousToArrayCall", "unchecked", "NullableProblems"}) + @NotNull @Override public <T1> T1[] toArray(T1[] a) { + + switch (state) { + case EMPTY: + if (a.length > 0) + a[0] = null; + + return a; + + case SINGLE: + if (a.length == 0) + return (T1[]) (new Object[] { target }); + else { + a[0] = (T1)target; + + if (a.length > 1) + a[1] = null; + + return a; + } + + default: + return asSet().toArray(a); + } + } + + /** {@inheritDoc} */ + @Override public boolean add(T t) { + switch (state) { + case EMPTY: + target = t; + + state = State.SINGLE; + + return true; + + case SINGLE: + return !F.eq(target, t) && toInflated().add(t); + + default: + return add(t); + } + } + + /** {@inheritDoc} */ + @Override public boolean remove(Object o) { + switch (state) { + case EMPTY: + return false; + + case SINGLE: + if (F.eq(target, o)) { + toEmpty(); + + return true; + } + else + return false; + + default: + return asSet().remove(o); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("NullableProblems") + @Override public boolean containsAll(Collection<?> c) { + switch (state) { + case EMPTY: + return false; + + case SINGLE: + return c.size() == 1 && c.contains(target); + + default: + return asSet().containsAll(c); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("NullableProblems") + @Override public boolean addAll(Collection<? extends T> c) { + switch (state) { + case EMPTY: + case SINGLE: + return !c.isEmpty() && toInflated().addAll(c); + + default: + return asSet().addAll(c); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("NullableProblems") + @Override public boolean retainAll(Collection<?> c) { + switch (state) { + case EMPTY: + return false; + + case SINGLE: + if (!c.contains(target)) { + toEmpty(); + + return true; + } + else + return false; + + default: + return asSet().retainAll(c); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("NullableProblems") + @Override public boolean removeAll(Collection<?> c) { + switch (state) { + case EMPTY: + return false; + + case SINGLE: + if (c.contains(target)) { + toEmpty(); + + return true; + } + else + return false; + + default: + return asSet().removeAll(c); + } + } + + /** {@inheritDoc} */ + @Override public void clear() { + switch (state) { + case SINGLE: + toEmpty(); + + case INFLATED: + asSet().clear(); + } + } + + /** + * Get target as object. + * + * @return Object. + */ + @SuppressWarnings("unchecked") + private T asObject() { + assert state == State.SINGLE; + + return (T)target; + } + + /** + * Get target as set. + * + * @return Set. + */ + @SuppressWarnings("unchecked") + private Set<T> asSet() { + assert state == State.INFLATED; + + return (Set<T>)target; + } + + /** + * Inflate to fully-fledged set. + * + * @return Created set. + */ + private Set<T> toInflated() { + assert state != State.INFLATED; + + Set<T> set = factory.create(); + + if (state == State.SINGLE) + set.add(asObject()); + + state = State.INFLATED; + + return set; + } + + /** + * Set to empty state. + */ + private void toEmpty() { + target = null; + + state = State.EMPTY; + } + + /** + * State. + */ + private enum State { + /** Empty. */ + EMPTY, + + /** Single element exists. */ + SINGLE, + + /** Inflated. */ + INFLATED + } + + /** + * Iterator for a single object. + */ + private class SingletonIterator implements Iterator<T> { + /** Whether iterator is already advanced. */ + private boolean advanced; + + /** Whether element is removed. */ + private boolean removed; + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return !advanced; + } + + /** {@inheritDoc} */ + @Override public T next() { + if (!hasNext()) + throw new NoSuchElementException(); + + checkConcurrentModification(); + + advanced = true; + + return asObject(); + } + + /** {@inheritDoc} */ + @Override public void remove() { + if (hasNext()) + throw new IllegalStateException("next() method was not called."); + + if (removed) + throw new IllegalStateException("remove() was already called."); + + checkConcurrentModification(); + + removed = true; + + toEmpty(); + } + + /** + * Check concurrent modification. + */ + private void checkConcurrentModification() { + if (state != State.SINGLE) + throw new ConcurrentModificationException(); + } + } + + /** + * Factory for real backing set. + */ + public interface Factory<T> { + /** + * Create a set. + * + * @return Set. + */ + Set<T> create(); + } + + /** + * Hash set factory. + */ + private static class HashSetFactory<T> implements Factory<T> { + /** {@inheritDoc} */ + @Override public Set<T> create() { + return U.newHashSet(2); + } + } + + /** + * Linked hash set factory. + */ + private static class LinkedHashSetFactory<T> implements Factory<T> { + /** {@inheritDoc} */ + @Override public Set<T> create() { + return U.newLinkedHashSet(2); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9533a23e/modules/core/src/main/java/org/apache/ignite/internal/util/SingletonSet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/SingletonSet.java b/modules/core/src/main/java/org/apache/ignite/internal/util/SingletonSet.java deleted file mode 100644 index 3680122..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/SingletonSet.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util; - -import org.apache.ignite.internal.util.typedef.F; -import org.jetbrains.annotations.NotNull; - -import java.util.AbstractSet; -import java.util.Iterator; -import java.util.NoSuchElementException; - -/** - * Singleton set. - */ -public class SingletonSet<E> extends AbstractSet<E> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Factory method. - * - * @param elem Element. - * @return Singleton set. - */ - public static <T> SingletonSet<T> create(T elem) { - return new SingletonSet<>(elem); - } - - /** Element. */ - private final E elem; - - /** - * Constructor. - * @param elem Element. - */ - SingletonSet(E elem) { - this.elem = elem; - } - - /** - * Get element. - * - * @return Element. - */ - public E element() { - return elem; - } - - /** {@inheritDoc} */ - @NotNull public Iterator<E> iterator() { - return new Iterator<E>() { - private boolean hasNext = true; - - @Override public boolean hasNext() { - return hasNext; - } - - @Override public E next() { - if (hasNext) { - hasNext = false; - - return elem; - } - else - throw new NoSuchElementException(); - } - - @Override public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - /** {@inheritDoc} */ - public int size() { - return 1; - } - - /** {@inheritDoc} */ - public boolean contains(Object o) { - return F.eq(o, elem); - } -}