Author: gdusbabek
Date: Fri Nov 19 15:27:33 2010
New Revision: 1036892
URL: http://svn.apache.org/viewvc?rev=1036892&view=rev
Log:
CFS.reload() assumes metadata is mutable. patch by gdusbabek, reviewe by
jbellis. CASSANDRA-1715
Added:
cassandra/trunk/src/java/org/apache/cassandra/utils/DefaultDouble.java
cassandra/trunk/src/java/org/apache/cassandra/utils/DefaultInteger.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1036892&r1=1036891&r2=1036892&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri
Nov 19 15:27:33 2010
@@ -131,11 +131,11 @@ public class ColumnFamilyStore implement
public final CFMetaData metadata;
/* These are locally held copies to be changed from the config during
runtime */
- private int minCompactionThreshold;
- private int maxCompactionThreshold;
- private int memtime;
- private int memsize;
- private double memops;
+ private DefaultInteger minCompactionThreshold;
+ private DefaultInteger maxCompactionThreshold;
+ private DefaultInteger memtime;
+ private DefaultInteger memsize;
+ private DefaultDouble memops;
private final Runnable rowCacheSaverTask = new WrappedRunnable()
{
@@ -152,6 +152,62 @@ public class ColumnFamilyStore implement
ssTables.saveKeyCache();
}
};
+
+ public void reload()
+ {
+ assert Table.flusherLock.writeLock().isHeldByCurrentThread();
+
+ // metadata object has been mutated directly. make all the members
jibe with new settings.
+
+ // only update these runtime-modifiable settings if they have not been
modified.
+ if (!minCompactionThreshold.isModified())
+ minCompactionThreshold = new
DefaultInteger(metadata.minCompactionThreshold);
+ if (!maxCompactionThreshold.isModified())
+ maxCompactionThreshold = new
DefaultInteger(metadata.maxCompactionThreshold);
+ if (!memtime.isModified())
+ memtime = new DefaultInteger(metadata.memtableFlushAfterMins);
+ if (!memsize.isModified())
+ memsize = new DefaultInteger(metadata.memtableThroughputInMb);
+ if (!memops.isModified())
+ memops = new DefaultDouble(metadata.memtableOperationsInMillions);
+
+ // reset the memtable with new settings.
+ try
+ {
+ forceBlockingFlush();
+ }
+ catch (InterruptedException ex)
+ {
+ throw new RuntimeException(ex);
+ }
+ catch (ExecutionException ex)
+ {
+ throw new IOError(ex.getCause());
+ }
+
+ // todo: update cache sizes, etc. see SSTableTracker
+
+ // drop indexes no longer needed
+ Set<ByteBuffer> indexesToDrop = new HashSet<ByteBuffer>();
+ for (ByteBuffer indexName : indexedColumns.keySet())
+ if (!metadata.column_metadata.containsKey(indexName))
+ indexesToDrop.add(indexName);
+ for (ByteBuffer indexName : indexesToDrop)
+ {
+ ColumnFamilyStore indexCfs = indexedColumns.remove(indexName);
+ assert indexCfs != null;
+ SystemTable.setIndexRemoved(metadata.tableName, metadata.cfName);
+ indexCfs.removeAllSSTables();
+ }
+
+ // there isn't a valid way to update existing indexes at this point
(nothing you can change),
+ // so don't bother with them.
+
+ // add indexes that are new
+ for (Map.Entry<ByteBuffer, ColumnDefinition> entry :
metadata.column_metadata.entrySet())
+ if (!indexedColumns.containsKey(entry.getKey()) &&
entry.getValue().index_type != null)
+ addIndex(entry.getValue());
+ }
private ColumnFamilyStore(Table table, String columnFamilyName,
IPartitioner partitioner, int generation, CFMetaData metadata)
{
@@ -159,11 +215,11 @@ public class ColumnFamilyStore implement
this.table = table;
columnFamily = columnFamilyName;
this.metadata = metadata;
- this.minCompactionThreshold = metadata.minCompactionThreshold;
- this.maxCompactionThreshold = metadata.maxCompactionThreshold;
- this.memtime = metadata.memtableFlushAfterMins;
- this.memsize = metadata.memtableThroughputInMb;
- this.memops = metadata.memtableOperationsInMillions;
+ this.minCompactionThreshold = new
DefaultInteger(metadata.minCompactionThreshold);
+ this.maxCompactionThreshold = new
DefaultInteger(metadata.maxCompactionThreshold);
+ this.memtime = new DefaultInteger(metadata.memtableFlushAfterMins);
+ this.memsize = new DefaultInteger(metadata.memtableThroughputInMb);
+ this.memops = new DefaultDouble(metadata.memtableOperationsInMillions);
this.partitioner = partitioner;
fileIndexGenerator.set(generation);
memtable = new Memtable(this);
@@ -536,7 +592,7 @@ public class ColumnFamilyStore implement
*/
public String getFlushPath()
{
- long guessedSize = 2 * memsize * 1024*1024; // 2* adds room for keys,
column indexes
+ long guessedSize = 2 * memsize.value() * 1024*1024; // 2* adds room
for keys, column indexes
String location =
DatabaseDescriptor.getDataFileLocationForTable(table.name, guessedSize);
if (location == null)
throw new RuntimeException("Insufficient disk space to flush");
@@ -1771,70 +1827,70 @@ public class ColumnFamilyStore implement
public int getMinimumCompactionThreshold()
{
- return minCompactionThreshold;
+ return minCompactionThreshold.value();
}
public void setMinimumCompactionThreshold(int minCompactionThreshold)
{
- if ((minCompactionThreshold > this.maxCompactionThreshold) &&
this.maxCompactionThreshold != 0) {
+ if ((minCompactionThreshold > this.maxCompactionThreshold.value()) &&
this.maxCompactionThreshold.value() != 0) {
throw new RuntimeException("The min_compaction_threshold cannot be
larger than the max.");
}
- this.minCompactionThreshold = minCompactionThreshold;
+ this.minCompactionThreshold.set(minCompactionThreshold);
}
public int getMaximumCompactionThreshold()
{
- return maxCompactionThreshold;
+ return maxCompactionThreshold.value();
}
public void setMaximumCompactionThreshold(int maxCompactionThreshold)
{
- if (maxCompactionThreshold < this.minCompactionThreshold) {
+ if (maxCompactionThreshold < this.minCompactionThreshold.value()) {
throw new RuntimeException("The max_compaction_threshold cannot be
smaller than the min.");
}
- this.maxCompactionThreshold = maxCompactionThreshold;
+ this.maxCompactionThreshold.set(maxCompactionThreshold);
}
public void disableAutoCompaction()
{
- this.minCompactionThreshold = 0;
- this.maxCompactionThreshold = 0;
+ minCompactionThreshold.set(0);
+ maxCompactionThreshold.set(0);
}
public int getMemtableFlushAfterMins()
{
- return memtime;
+ return memtime.value();
}
public void setMemtableFlushAfterMins(int time)
{
if (time <= 0) {
throw new RuntimeException("MemtableFlushAfterMins must be greater
than 0.");
}
- this.memtime = time;
+ this.memtime.set(time);
}
public int getMemtableThroughputInMB()
{
- return memsize;
+ return memsize.value();
}
public void setMemtableThroughputInMB(int size)
{
if (size <= 0) {
throw new RuntimeException("MemtableThroughputInMB must be greater
than 0.");
}
- this.memsize = size;
+ this.memsize.set(size);
}
public double getMemtableOperationsInMillions()
{
- return memops;
+ return memops.value();
}
public void setMemtableOperationsInMillions(double ops)
{
if (ops <= 0) {
throw new RuntimeException("MemtableOperationsInMillions must be
greater than 0.0.");
}
- this.memops = ops;
+ this.memops.set(ops);
}
public long estimateKeys()
Added: cassandra/trunk/src/java/org/apache/cassandra/utils/DefaultDouble.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/DefaultDouble.java?rev=1036892&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/DefaultDouble.java
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/DefaultDouble.java Fri
Nov 19 15:27:33 2010
@@ -0,0 +1,47 @@
+package org.apache.cassandra.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+public class DefaultDouble
+{
+ private final double originalValue;
+ private double currentValue;
+
+ public DefaultDouble(double value)
+ {
+ originalValue = value;
+ currentValue = value;
+ }
+
+ public double value()
+ {
+ return currentValue;
+ }
+
+ public void set(double d)
+ {
+ currentValue = d;
+ }
+
+ public boolean isModified()
+ {
+ return originalValue != currentValue;
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/utils/DefaultInteger.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/DefaultInteger.java?rev=1036892&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/DefaultInteger.java
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/DefaultInteger.java Fri
Nov 19 15:27:33 2010
@@ -0,0 +1,47 @@
+package org.apache.cassandra.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+public class DefaultInteger
+{
+ private final int originalValue;
+ private int currentValue;
+
+ public DefaultInteger(int value)
+ {
+ originalValue = value;
+ currentValue = value;
+ }
+
+ public int value()
+ {
+ return currentValue;
+ }
+
+ public void set(int i)
+ {
+ currentValue = i;
+ }
+
+ public boolean isModified()
+ {
+ return originalValue != currentValue;
+ }
+}