Fix schema concurrency exceptions patch by Benedict Elliott Smith; reviewed by Carl Yeksigian for CASSANDRA-6841
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bfd03f19 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bfd03f19 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bfd03f19 Branch: refs/heads/trunk Commit: bfd03f193938104cc86d59ccda137eece5cfd176 Parents: c49d336 Author: Jonathan Ellis <[email protected]> Authored: Fri Mar 14 13:11:25 2014 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Fri Mar 14 13:12:05 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/Schema.java | 8 +- .../db/commitlog/CommitLogAllocator.java | 2 +- .../db/commitlog/CommitLogSegment.java | 15 ++- .../apache/cassandra/utils/ConcurrentBiMap.java | 131 +++++++++++++++++++ 5 files changed, 147 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd03f19/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e208e21..6483012 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.7 + * Fix schema concurrency exceptions (CASSANDRA-6841) * Fix leaking validator FH in StreamWriter (CASSANDRA-6832) * Fix saving triggers to schema (CASSANDRA-6789) * Fix trigger mutations when base mutation list is immutable (CASSANDRA-6790) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd03f19/src/java/org/apache/cassandra/config/Schema.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java index d822704..0907177 100644 --- a/src/java/org/apache/cassandra/config/Schema.java +++ b/src/java/org/apache/cassandra/config/Schema.java @@ -23,7 +23,9 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.*; -import com.google.common.collect.*; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +34,7 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.service.MigrationManager; -import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.ConcurrentBiMap; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; import org.cliffc.high_scale_lib.NonBlockingHashMap; @@ -58,7 +60,7 @@ public class Schema private final Map<String, Keyspace> keyspaceInstances = new NonBlockingHashMap<String, Keyspace>(); /* metadata map for faster ColumnFamily lookup */ - private final BiMap<Pair<String, String>, UUID> cfIdMap = HashBiMap.create(); + private final ConcurrentBiMap<Pair<String, String>, UUID> cfIdMap = new ConcurrentBiMap<>(); private volatile UUID version; http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd03f19/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java index 575e3c3..3009a63 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java @@ -304,7 +304,7 @@ public class CommitLogAllocator { CommitLogSegment oldestSegment = activeSegments.peek(); - if (oldestSegment != null) + if (oldestSegment != null && oldestSegment != CommitLog.instance.activeSegment) { for (UUID dirtyCFId : oldestSegment.getDirtyCFIDs()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd03f19/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 25658ed..5b8bcfa 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -25,10 +25,13 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; +import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.Checksum; @@ -63,7 +66,7 @@ public class CommitLogSegment static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8; // cache which cf is dirty in this segment to avoid having to lookup all ReplayPositions to decide if we can delete this segment - private final HashMap<UUID, Integer> cfLastWrite = new HashMap<UUID, Integer>(); + private final Map<UUID, Integer> cfLastWrite = new HashMap<>(); public final long id; @@ -355,7 +358,7 @@ public class CommitLogSegment * @param cfId the column family ID that is now clean * @param context the optional clean offset */ - public void markClean(UUID cfId, ReplayPosition context) + public synchronized void markClean(UUID cfId, ReplayPosition context) { Integer lastWritten = cfLastWrite.get(cfId); @@ -368,15 +371,15 @@ public class CommitLogSegment /** * @return a collection of dirty CFIDs for this segment file. */ - public Collection<UUID> getDirtyCFIDs() + public synchronized Collection<UUID> getDirtyCFIDs() { - return cfLastWrite.keySet(); + return new ArrayList<>(cfLastWrite.keySet()); } /** * @return true if this segment is unused and safe to recycle or delete */ - public boolean isUnused() + public synchronized boolean isUnused() { return cfLastWrite.isEmpty(); } @@ -396,7 +399,7 @@ public class CommitLogSegment public String dirtyString() { StringBuilder sb = new StringBuilder(); - for (UUID cfId : cfLastWrite.keySet()) + for (UUID cfId : getDirtyCFIDs()) { CFMetaData m = Schema.instance.getCFMetaData(cfId); sb.append(m == null ? "<deleted>" : m.cfName).append(" (").append(cfId).append("), "); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd03f19/src/java/org/apache/cassandra/utils/ConcurrentBiMap.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ConcurrentBiMap.java b/src/java/org/apache/cassandra/utils/ConcurrentBiMap.java new file mode 100644 index 0000000..b4dfa2e --- /dev/null +++ b/src/java/org/apache/cassandra/utils/ConcurrentBiMap.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + * A variant of BiMap that permits concurrent access, and expects uniqueness of values in both domain and range. + * We synchronize on _modifications only_, and use ConcurrentHashMap so that readers can lookup safely. This does mean there + * could be races to lookup the inverse, but we aren't too worried about that. + * + * @param <K> + * @param <V> + */ +public class ConcurrentBiMap<K, V> implements Map<K, V> +{ + protected final Map<K, V> forwardMap; + protected final Map<V, K> reverseMap; + + public ConcurrentBiMap() + { + this(new ConcurrentHashMap<K, V>(16, 0.5f, 1), new ConcurrentHashMap<V, K>(16, 0.5f, 1)); + } + + protected ConcurrentBiMap(Map<K, V> forwardMap, Map<V, K> reverseMap) + { + this.forwardMap = forwardMap; + this.reverseMap = reverseMap; + } + + public Map<V, K> inverse() + { + return Collections.unmodifiableMap(reverseMap); + } + + public void clear() + { + forwardMap.clear(); + reverseMap.clear(); + } + + public boolean containsKey(Object key) + { + return forwardMap.containsKey(key); + } + + public boolean containsValue(Object value) + { + return reverseMap.containsKey(value); + } + + public Set<Entry<K, V>> entrySet() + { + return forwardMap.entrySet(); + } + + public V get(Object key) + { + return forwardMap.get(key); + } + + public boolean isEmpty() + { + return forwardMap.isEmpty(); + } + + public Set<K> keySet() + { + return forwardMap.keySet(); + } + + public synchronized V put(K key, V value) + { + K oldKey = reverseMap.get(value); + if (oldKey != null && !key.equals(oldKey)) + throw new IllegalArgumentException(value + " is already bound in reverseMap to " + oldKey); + V oldVal = forwardMap.put(key, value); + if (oldVal != null && !Objects.equals(reverseMap.remove(oldVal), key)) + throw new IllegalStateException(); // for the prior mapping to be correct, we MUST get back the key from the reverseMap + reverseMap.put(value, key); + return oldVal; + } + + public synchronized void putAll(Map<? extends K, ? extends V> m) + { + for (Entry<? extends K, ? extends V> entry : m.entrySet()) + put(entry.getKey(), entry.getValue()); + } + + public synchronized V remove(Object key) + { + V oldVal = forwardMap.remove(key); + if (oldVal == null) + return null; + Object oldKey = reverseMap.remove(oldVal); + if (oldKey == null || !oldKey.equals(key)) + throw new IllegalStateException(); // for the prior mapping to be correct, we MUST get back the key from the reverseMap + return oldVal; + } + + public int size() + { + return forwardMap.size(); + } + + public Collection<V> values() + { + return reverseMap.keySet(); + } +}
