Author: jbellis
Date: Tue Nov 23 19:27:44 2010
New Revision: 1038293
URL: http://svn.apache.org/viewvc?rev=1038293&view=rev
Log:
rebuild Strategy during system_update_keyspace
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1762
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/TokenMetadata.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Nov 23 19:27:44 2010
@@ -8,6 +8,7 @@ dev
* require index_type to be present when specifying index_name
on ColumnDef (CASSANDRA-1759)
* fix add/remove index bugs in CFMetadata (CASSANDRA-1768)
+ * rebuild Strategy during system_update_keyspace (CASSANDRA-1762)
0.7.0-rc1
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
Tue Nov 23 19:27:44 2010
@@ -96,7 +96,7 @@ public class Table
public final Map<Integer, ColumnFamilyStore> columnFamilyStores = new
HashMap<Integer, ColumnFamilyStore>(); // TODO make private again
private final Object[] indexLocks;
private ScheduledFuture<?> flushTask;
- public final AbstractReplicationStrategy replicationStrategy;
+ private volatile AbstractReplicationStrategy replicationStrategy;
public static Table open(String table)
{
@@ -244,11 +244,7 @@ public class Table
KSMetaData ksm = DatabaseDescriptor.getKSMetaData(table);
try
{
- replicationStrategy =
AbstractReplicationStrategy.createReplicationStrategy(table,
-
ksm.strategyClass,
-
StorageService.instance.getTokenMetadata(),
-
DatabaseDescriptor.getEndpointSnitch(),
-
ksm.strategyOptions);
+ createReplicationStrategy(ksm);
}
catch (ConfigurationException e)
{
@@ -302,7 +298,19 @@ public class Table
};
flushTask =
StorageService.scheduledTasks.scheduleWithFixedDelay(runnable, minCheckMs,
minCheckMs, TimeUnit.MILLISECONDS);
}
-
+
+ public void createReplicationStrategy(KSMetaData ksm) throws
ConfigurationException
+ {
+ if (replicationStrategy != null)
+
StorageService.instance.getTokenMetadata().unregister(replicationStrategy);
+
+ replicationStrategy =
AbstractReplicationStrategy.createReplicationStrategy(ksm.name,
+
ksm.strategyClass,
+
StorageService.instance.getTokenMetadata(),
+
DatabaseDescriptor.getEndpointSnitch(),
+
ksm.strategyOptions);
+ }
+
// best invoked on the compaction mananger.
public void dropCf(Integer cfId) throws IOException
{
@@ -557,6 +565,11 @@ public class Table
return new IndexBuilder(cfs, columns, iter);
}
+ public AbstractReplicationStrategy getReplicationStrategy()
+ {
+ return replicationStrategy;
+ }
+
public class IndexBuilder implements ICompactionInfo
{
private final ColumnFamilyStore cfs;
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java
Tue Nov 23 19:27:44 2010
@@ -5,6 +5,7 @@ import org.apache.cassandra.config.Confi
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -59,7 +60,18 @@ public class UpdateKeyspace extends Migr
{
DatabaseDescriptor.clearTableDefinition(oldKsm, newVersion);
DatabaseDescriptor.setTableDefinition(newKsm, newVersion);
- Table.open(newKsm.name).replicationStrategy.clearEndpointCache();
+
+
+ Table table = Table.open(newKsm.name);
+ try
+ {
+ table.createReplicationStrategy(newKsm);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IOException(e);
+ }
+
logger.info("Keyspace updated. Please perform any manual operations.");
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
Tue Nov 23 19:27:44 2010
@@ -191,7 +191,7 @@ public class BootStrapper
Multimap<Range, InetAddress> getRangesWithSources(String table)
{
assert tokenMetadata.sortedTokens().size() > 0;
- final AbstractReplicationStrategy strat =
Table.open(table).replicationStrategy;
+ final AbstractReplicationStrategy strat =
Table.open(table).getReplicationStrategy();
Collection<Range> myRanges =
strat.getPendingAddressRanges(tokenMetadata, token, address);
Multimap<Range, InetAddress> myRangeAddresses =
ArrayListMultimap.create();
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/TokenMetadata.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/TokenMetadata.java
Tue Nov 23 19:27:44 2010
@@ -540,6 +540,11 @@ public class TokenMetadata
subscribers.add(subscriber);
}
+ public void unregister(AbstractReplicationStrategy subscriber)
+ {
+ subscribers.remove(subscriber);
+ }
+
/**
* write endpoints may be different from read endpoints, because read
endpoints only need care about the
* "natural" nodes for a token, but write endpoints also need to account
for nodes that are bootstrapping
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
Tue Nov 23 19:27:44 2010
@@ -65,7 +65,7 @@ public class DatacenterQuorumResponseHan
@Override
public int determineBlockFor(ConsistencyLevel consistency_level, String
table)
{
- NetworkTopologyStrategy stategy = (NetworkTopologyStrategy)
Table.open(table).replicationStrategy;
+ NetworkTopologyStrategy stategy = (NetworkTopologyStrategy)
Table.open(table).getReplicationStrategy();
return (stategy.getReplicationFactor(localdc) / 2) + 1;
}
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
Tue Nov 23 19:27:44 2010
@@ -63,7 +63,7 @@ public class DatacenterSyncWriteResponse
super(writeEndpoints, hintedEndpoints, consistencyLevel);
assert consistencyLevel == ConsistencyLevel.LOCAL_QUORUM;
- strategy = (NetworkTopologyStrategy)
Table.open(table).replicationStrategy;
+ strategy = (NetworkTopologyStrategy)
Table.open(table).getReplicationStrategy();
for (String dc : strategy.getDatacenters())
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
Tue Nov 23 19:27:44 2010
@@ -65,7 +65,7 @@ public class DatacenterWriteResponseHand
@Override
protected int determineBlockFor(String table)
{
- NetworkTopologyStrategy strategy = (NetworkTopologyStrategy)
Table.open(table).replicationStrategy;
+ NetworkTopologyStrategy strategy = (NetworkTopologyStrategy)
Table.open(table).getReplicationStrategy();
return (strategy.getReplicationFactor(localdc) / 2) + 1;
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Tue Nov 23 19:27:44 2010
@@ -104,7 +104,7 @@ public class StorageProxy implements Sto
{
mostRecentRowMutation = rm;
String table = rm.getTable();
- AbstractReplicationStrategy rs =
Table.open(table).replicationStrategy;
+ AbstractReplicationStrategy rs =
Table.open(table).getReplicationStrategy();
List<InetAddress> naturalEndpoints =
ss.getNaturalEndpoints(table, rm.key());
Collection<InetAddress> writeEndpoints =
ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(rm.key()),
table, naturalEndpoints);
@@ -342,7 +342,7 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug("strongread reading " + (m == message ?
"data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" +
endpoint);
}
- AbstractReplicationStrategy rs =
Table.open(command.table).replicationStrategy;
+ AbstractReplicationStrategy rs =
Table.open(command.table).getReplicationStrategy();
QuorumResponseHandler<Row> quorumResponseHandler =
rs.getQuorumResponseHandler(new ReadResponseResolver(command.table),
consistency_level);
MessagingService.instance.sendRR(messages, endpoints,
quorumResponseHandler);
quorumResponseHandlers.add(quorumResponseHandler);
@@ -368,7 +368,7 @@ public class StorageProxy implements Sto
}
catch (DigestMismatchException ex)
{
- AbstractReplicationStrategy rs =
Table.open(command.table).replicationStrategy;
+ AbstractReplicationStrategy rs =
Table.open(command.table).getReplicationStrategy();
QuorumResponseHandler<Row> qrhRepair =
rs.getQuorumResponseHandler(new ReadResponseResolver(command.table),
ConsistencyLevel.QUORUM);
if (logger.isDebugEnabled())
logger.debug("Digest mismatch:", ex);
@@ -448,7 +448,7 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new
RangeSliceResponseResolver(command.keyspace, liveEndpoints);
- AbstractReplicationStrategy rs =
Table.open(command.keyspace).replicationStrategy;
+ AbstractReplicationStrategy rs =
Table.open(command.keyspace).getReplicationStrategy();
QuorumResponseHandler<List<Row>> handler =
rs.getQuorumResponseHandler(resolver, consistency_level);
// TODO bail early if live endpoints can't satisfy requested
consistency level
for (InetAddress endpoint : liveEndpoints)
@@ -664,7 +664,7 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new
RangeSliceResponseResolver(keyspace, liveEndpoints);
- AbstractReplicationStrategy rs =
Table.open(keyspace).replicationStrategy;
+ AbstractReplicationStrategy rs =
Table.open(keyspace).getReplicationStrategy();
QuorumResponseHandler<List<Row>> handler =
rs.getQuorumResponseHandler(resolver, consistency_level);
// bail early if live endpoints can't satisfy requested
consistency level
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
Tue Nov 23 19:27:44 2010
@@ -558,7 +558,7 @@ public class StorageService implements I
Map<Range, List<InetAddress>> rangeToEndpointMap = new HashMap<Range,
List<InetAddress>>();
for (Range range : ranges)
{
- rangeToEndpointMap.put(range,
Table.open(keyspace).replicationStrategy.getNaturalEndpoints(range.right));
+ rangeToEndpointMap.put(range,
Table.open(keyspace).getReplicationStrategy().getNaturalEndpoints(range.right));
}
return rangeToEndpointMap;
}
@@ -824,7 +824,7 @@ public class StorageService implements I
private void calculatePendingRanges()
{
for (String table : DatabaseDescriptor.getNonSystemTables())
- calculatePendingRanges(Table.open(table).replicationStrategy,
table);
+ calculatePendingRanges(Table.open(table).getReplicationStrategy(),
table);
}
// public & static for testing purposes
@@ -894,7 +894,7 @@ public class StorageService implements I
private Multimap<InetAddress, Range> getNewSourceRanges(String table,
Set<Range> ranges)
{
InetAddress myAddress = FBUtilities.getLocalAddress();
- Multimap<Range, InetAddress> rangeAddresses =
Table.open(table).replicationStrategy.getRangeAddresses(tokenMetadata_);
+ Multimap<Range, InetAddress> rangeAddresses =
Table.open(table).getReplicationStrategy().getRangeAddresses(tokenMetadata_);
Multimap<InetAddress, Range> sourceRanges = HashMultimap.create();
IFailureDetector failureDetector = FailureDetector.instance;
@@ -1017,7 +1017,7 @@ public class StorageService implements I
// Find (for each range) all nodes that store replicas for these
ranges as well
for (Range range : ranges)
- currentReplicaEndpoints.put(range,
Table.open(table).replicationStrategy.calculateNaturalEndpoints(range.right,
tokenMetadata_));
+ currentReplicaEndpoints.put(range,
Table.open(table).getReplicationStrategy().calculateNaturalEndpoints(range.right,
tokenMetadata_));
TokenMetadata temp = tokenMetadata_.cloneAfterAllLeft();
@@ -1035,7 +1035,7 @@ public class StorageService implements I
// range.
for (Range range : ranges)
{
- Collection<InetAddress> newReplicaEndpoints =
Table.open(table).replicationStrategy.calculateNaturalEndpoints(range.right,
temp);
+ Collection<InetAddress> newReplicaEndpoints =
Table.open(table).getReplicationStrategy().calculateNaturalEndpoints(range.right,
temp);
newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range));
if (logger_.isDebugEnabled())
if (newReplicaEndpoints.isEmpty())
@@ -1359,7 +1359,7 @@ public class StorageService implements I
*/
Collection<Range> getRangesForEndpoint(String table, InetAddress ep)
{
- return
Table.open(table).replicationStrategy.getAddressRanges().get(ep);
+ return
Table.open(table).getReplicationStrategy().getAddressRanges().get(ep);
}
/**
@@ -1409,7 +1409,7 @@ public class StorageService implements I
*/
public List<InetAddress> getNaturalEndpoints(String table, Token token)
{
- return
Table.open(table).replicationStrategy.getNaturalEndpoints(token);
+ return
Table.open(table).getReplicationStrategy().getNaturalEndpoints(token);
}
/**
@@ -1427,7 +1427,7 @@ public class StorageService implements I
public List<InetAddress> getLiveNaturalEndpoints(String table, Token token)
{
List<InetAddress> liveEps = new ArrayList<InetAddress>();
- List<InetAddress> endpoints =
Table.open(table).replicationStrategy.getNaturalEndpoints(token);
+ List<InetAddress> endpoints =
Table.open(table).getReplicationStrategy().getNaturalEndpoints(token);
for (InetAddress endpoint : endpoints)
{
Modified:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
(original)
+++
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
Tue Nov 23 19:27:44 2010
@@ -43,7 +43,7 @@ public class ReplicationStrategyEndpoint
tmd = new TokenMetadata();
searchToken = new BigIntegerToken(String.valueOf(15));
- strategy =
getStrategyWithNewTokenMetadata(Table.open("Keyspace3").replicationStrategy,
tmd);
+ strategy =
getStrategyWithNewTokenMetadata(Table.open("Keyspace3").getReplicationStrategy(),
tmd);
tmd.updateNormalToken(new BigIntegerToken(String.valueOf(10)),
InetAddress.getByName("127.0.0.1"));
tmd.updateNormalToken(new BigIntegerToken(String.valueOf(20)),
InetAddress.getByName("127.0.0.2"));
Modified:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
(original)
+++
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
Tue Nov 23 19:27:44 2010
@@ -43,7 +43,7 @@ public class SimpleStrategyTest extends
@Test
public void tryValidTable()
{
- assert Table.open("Keyspace1").replicationStrategy != null;
+ assert Table.open("Keyspace1").getReplicationStrategy() != null;
}
@Test
Modified:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=1038293&r1=1038292&r2=1038293&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
(original)
+++
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Tue Nov 23 19:27:44 2010
@@ -183,7 +183,7 @@ public class AntiEntropyServiceTest exte
// generate rf*2 nodes, and ensure that only neighbors specified by
the ARS are returned
addTokens(2 * DatabaseDescriptor.getReplicationFactor(tablename));
- AbstractReplicationStrategy ars =
Table.open(tablename).replicationStrategy;
+ AbstractReplicationStrategy ars =
Table.open(tablename).getReplicationStrategy();
Set<InetAddress> expected = new HashSet<InetAddress>();
for (Range replicaRange :
ars.getAddressRanges().get(FBUtilities.getLocalAddress()))
{