Updated Branches: refs/heads/trunk c4481e207 -> 60027c4cc
add memtable_flush_period_in_ms patch by yukim; reviewed by jbellis for CASSANDRA-4237 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/60027c4c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/60027c4c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/60027c4c Branch: refs/heads/trunk Commit: 60027c4ccabaab390dbf4c4bba83ac3a843b3a48 Parents: c4481e2 Author: Jonathan Ellis <[email protected]> Authored: Thu Nov 15 17:03:29 2012 +0100 Committer: Jonathan Ellis <[email protected]> Committed: Thu Nov 15 17:03:29 2012 +0100 ---------------------------------------------------------------------- CHANGES.txt | 4 + interface/cassandra.thrift | 1 + .../org/apache/cassandra/thrift/CfDef.java | 110 +++++++++++++-- src/java/org/apache/cassandra/cli/CliClient.java | 4 + .../org/apache/cassandra/config/CFMetaData.java | 22 +++- .../apache/cassandra/cql/AlterTableStatement.java | 1 + src/java/org/apache/cassandra/cql/CFPropDefs.java | 2 + .../cassandra/cql/CreateColumnFamilyStatement.java | 3 +- src/java/org/apache/cassandra/cql3/CFPropDefs.java | 13 +- .../org/apache/cassandra/db/ColumnFamilyStore.java | 31 ++++- src/java/org/apache/cassandra/db/Memtable.java | 10 ++ 11 files changed, 178 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index be34e89..4445407 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,7 @@ +1.3 + * add memtable_flush_period_in_ms (CASSANDRA-4237) + + 1.2.1 * pool [Compressed]RandomAccessReader objects on the partitioned read path (CASSANDRA-4942) http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/interface/cassandra.thrift ---------------------------------------------------------------------- diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift index c52263b..0a92a9d 100644 --- a/interface/cassandra.thrift +++ b/interface/cassandra.thrift @@ -442,6 +442,7 @@ struct CfDef { 33: optional double bloom_filter_fp_chance, 34: optional string caching="keys_only", 37: optional double dclocal_read_repair_chance = 0.0, + 38: optional i32 memtable_flush_period_in_ms, /* All of the following are now ignored and unsupplied. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java ---------------------------------------------------------------------- diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java index ccf7fad..50ec681 100644 --- a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java +++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java @@ -67,6 +67,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav private static final org.apache.thrift.protocol.TField BLOOM_FILTER_FP_CHANCE_FIELD_DESC = new org.apache.thrift.protocol.TField("bloom_filter_fp_chance", org.apache.thrift.protocol.TType.DOUBLE, (short)33); private static final org.apache.thrift.protocol.TField CACHING_FIELD_DESC = new org.apache.thrift.protocol.TField("caching", org.apache.thrift.protocol.TType.STRING, (short)34); private static final org.apache.thrift.protocol.TField DCLOCAL_READ_REPAIR_CHANCE_FIELD_DESC = new org.apache.thrift.protocol.TField("dclocal_read_repair_chance", org.apache.thrift.protocol.TType.DOUBLE, (short)37); + private static final org.apache.thrift.protocol.TField MEMTABLE_FLUSH_PERIOD_IN_MS_FIELD_DESC = new org.apache.thrift.protocol.TField("memtable_flush_period_in_ms", org.apache.thrift.protocol.TType.I32, (short)38); private static final org.apache.thrift.protocol.TField ROW_CACHE_SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_size", org.apache.thrift.protocol.TType.DOUBLE, (short)9); private static final org.apache.thrift.protocol.TField KEY_CACHE_SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("key_cache_size", org.apache.thrift.protocol.TType.DOUBLE, (short)11); private static final org.apache.thrift.protocol.TField ROW_CACHE_SAVE_PERIOD_IN_SECONDS_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_save_period_in_seconds", org.apache.thrift.protocol.TType.I32, (short)19); @@ -100,6 +101,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav public double bloom_filter_fp_chance; // required public String caching; // required public double dclocal_read_repair_chance; // required + public int memtable_flush_period_in_ms; // required /** * @deprecated */ @@ -165,6 +167,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav BLOOM_FILTER_FP_CHANCE((short)33, "bloom_filter_fp_chance"), CACHING((short)34, "caching"), DCLOCAL_READ_REPAIR_CHANCE((short)37, "dclocal_read_repair_chance"), + MEMTABLE_FLUSH_PERIOD_IN_MS((short)38, "memtable_flush_period_in_ms"), /** * @deprecated */ @@ -263,6 +266,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav return CACHING; case 37: // DCLOCAL_READ_REPAIR_CHANCE return DCLOCAL_READ_REPAIR_CHANCE; + case 38: // MEMTABLE_FLUSH_PERIOD_IN_MS + return MEMTABLE_FLUSH_PERIOD_IN_MS; case 9: // ROW_CACHE_SIZE return ROW_CACHE_SIZE; case 11: // KEY_CACHE_SIZE @@ -331,16 +336,17 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav private static final int __REPLICATE_ON_WRITE_ISSET_ID = 5; private static final int __BLOOM_FILTER_FP_CHANCE_ISSET_ID = 6; private static final int __DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID = 7; - private static final int __ROW_CACHE_SIZE_ISSET_ID = 8; - private static final int __KEY_CACHE_SIZE_ISSET_ID = 9; - private static final int __ROW_CACHE_SAVE_PERIOD_IN_SECONDS_ISSET_ID = 10; - private static final int __KEY_CACHE_SAVE_PERIOD_IN_SECONDS_ISSET_ID = 11; - private static final int __MEMTABLE_FLUSH_AFTER_MINS_ISSET_ID = 12; - private static final int __MEMTABLE_THROUGHPUT_IN_MB_ISSET_ID = 13; - private static final int __MEMTABLE_OPERATIONS_IN_MILLIONS_ISSET_ID = 14; - private static final int __MERGE_SHARDS_CHANCE_ISSET_ID = 15; - private static final int __ROW_CACHE_KEYS_TO_SAVE_ISSET_ID = 16; - private BitSet __isset_bit_vector = new BitSet(17); + private static final int __MEMTABLE_FLUSH_PERIOD_IN_MS_ISSET_ID = 8; + private static final int __ROW_CACHE_SIZE_ISSET_ID = 9; + private static final int __KEY_CACHE_SIZE_ISSET_ID = 10; + private static final int __ROW_CACHE_SAVE_PERIOD_IN_SECONDS_ISSET_ID = 11; + private static final int __KEY_CACHE_SAVE_PERIOD_IN_SECONDS_ISSET_ID = 12; + private static final int __MEMTABLE_FLUSH_AFTER_MINS_ISSET_ID = 13; + private static final int __MEMTABLE_THROUGHPUT_IN_MB_ISSET_ID = 14; + private static final int __MEMTABLE_OPERATIONS_IN_MILLIONS_ISSET_ID = 15; + private static final int __MERGE_SHARDS_CHANCE_ISSET_ID = 16; + private static final int __ROW_CACHE_KEYS_TO_SAVE_ISSET_ID = 17; + private BitSet __isset_bit_vector = new BitSet(18); public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { @@ -394,6 +400,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.DCLOCAL_READ_REPAIR_CHANCE, new org.apache.thrift.meta_data.FieldMetaData("dclocal_read_repair_chance", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE))); + tmpMap.put(_Fields.MEMTABLE_FLUSH_PERIOD_IN_MS, new org.apache.thrift.meta_data.FieldMetaData("memtable_flush_period_in_ms", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); tmpMap.put(_Fields.ROW_CACHE_SIZE, new org.apache.thrift.meta_data.FieldMetaData("row_cache_size", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE))); tmpMap.put(_Fields.KEY_CACHE_SIZE, new org.apache.thrift.meta_data.FieldMetaData("key_cache_size", org.apache.thrift.TFieldRequirementType.OPTIONAL, @@ -523,6 +531,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav this.caching = other.caching; } this.dclocal_read_repair_chance = other.dclocal_read_repair_chance; + this.memtable_flush_period_in_ms = other.memtable_flush_period_in_ms; this.row_cache_size = other.row_cache_size; this.key_cache_size = other.key_cache_size; this.row_cache_save_period_in_seconds = other.row_cache_save_period_in_seconds; @@ -576,6 +585,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav this.dclocal_read_repair_chance = 0; + setMemtable_flush_period_in_msIsSet(false); + this.memtable_flush_period_in_ms = 0; setRow_cache_sizeIsSet(false); this.row_cache_size = 0.0; setKey_cache_sizeIsSet(false); @@ -1164,6 +1175,29 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav __isset_bit_vector.set(__DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID, value); } + public int getMemtable_flush_period_in_ms() { + return this.memtable_flush_period_in_ms; + } + + public CfDef setMemtable_flush_period_in_ms(int memtable_flush_period_in_ms) { + this.memtable_flush_period_in_ms = memtable_flush_period_in_ms; + setMemtable_flush_period_in_msIsSet(true); + return this; + } + + public void unsetMemtable_flush_period_in_ms() { + __isset_bit_vector.clear(__MEMTABLE_FLUSH_PERIOD_IN_MS_ISSET_ID); + } + + /** Returns true if field memtable_flush_period_in_ms is set (has been assigned a value) and false otherwise */ + public boolean isSetMemtable_flush_period_in_ms() { + return __isset_bit_vector.get(__MEMTABLE_FLUSH_PERIOD_IN_MS_ISSET_ID); + } + + public void setMemtable_flush_period_in_msIsSet(boolean value) { + __isset_bit_vector.set(__MEMTABLE_FLUSH_PERIOD_IN_MS_ISSET_ID, value); + } + /** * @deprecated */ @@ -1633,6 +1667,14 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav } break; + case MEMTABLE_FLUSH_PERIOD_IN_MS: + if (value == null) { + unsetMemtable_flush_period_in_ms(); + } else { + setMemtable_flush_period_in_ms((Integer)value); + } + break; + case ROW_CACHE_SIZE: if (value == null) { unsetRow_cache_size(); @@ -1784,6 +1826,9 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav case DCLOCAL_READ_REPAIR_CHANCE: return Double.valueOf(getDclocal_read_repair_chance()); + case MEMTABLE_FLUSH_PERIOD_IN_MS: + return Integer.valueOf(getMemtable_flush_period_in_ms()); + case ROW_CACHE_SIZE: return Double.valueOf(getRow_cache_size()); @@ -1869,6 +1914,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav return isSetCaching(); case DCLOCAL_READ_REPAIR_CHANCE: return isSetDclocal_read_repair_chance(); + case MEMTABLE_FLUSH_PERIOD_IN_MS: + return isSetMemtable_flush_period_in_ms(); case ROW_CACHE_SIZE: return isSetRow_cache_size(); case KEY_CACHE_SIZE: @@ -2104,6 +2151,15 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav return false; } + boolean this_present_memtable_flush_period_in_ms = true && this.isSetMemtable_flush_period_in_ms(); + boolean that_present_memtable_flush_period_in_ms = true && that.isSetMemtable_flush_period_in_ms(); + if (this_present_memtable_flush_period_in_ms || that_present_memtable_flush_period_in_ms) { + if (!(this_present_memtable_flush_period_in_ms && that_present_memtable_flush_period_in_ms)) + return false; + if (this.memtable_flush_period_in_ms != that.memtable_flush_period_in_ms) + return false; + } + boolean this_present_row_cache_size = true && this.isSetRow_cache_size(); boolean that_present_row_cache_size = true && that.isSetRow_cache_size(); if (this_present_row_cache_size || that_present_row_cache_size) { @@ -2311,6 +2367,11 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav if (present_dclocal_read_repair_chance) builder.append(dclocal_read_repair_chance); + boolean present_memtable_flush_period_in_ms = true && (isSetMemtable_flush_period_in_ms()); + builder.append(present_memtable_flush_period_in_ms); + if (present_memtable_flush_period_in_ms) + builder.append(memtable_flush_period_in_ms); + boolean present_row_cache_size = true && (isSetRow_cache_size()); builder.append(present_row_cache_size); if (present_row_cache_size) @@ -2592,6 +2653,16 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav return lastComparison; } } + lastComparison = Boolean.valueOf(isSetMemtable_flush_period_in_ms()).compareTo(typedOther.isSetMemtable_flush_period_in_ms()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetMemtable_flush_period_in_ms()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.memtable_flush_period_in_ms, typedOther.memtable_flush_period_in_ms); + if (lastComparison != 0) { + return lastComparison; + } + } lastComparison = Boolean.valueOf(isSetRow_cache_size()).compareTo(typedOther.isSetRow_cache_size()); if (lastComparison != 0) { return lastComparison; @@ -2906,6 +2977,14 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); } break; + case 38: // MEMTABLE_FLUSH_PERIOD_IN_MS + if (field.type == org.apache.thrift.protocol.TType.I32) { + this.memtable_flush_period_in_ms = iprot.readI32(); + setMemtable_flush_period_in_msIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; case 9: // ROW_CACHE_SIZE if (field.type == org.apache.thrift.protocol.TType.DOUBLE) { this.row_cache_size = iprot.readDouble(); @@ -3209,6 +3288,11 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav oprot.writeDouble(this.dclocal_read_repair_chance); oprot.writeFieldEnd(); } + if (isSetMemtable_flush_period_in_ms()) { + oprot.writeFieldBegin(MEMTABLE_FLUSH_PERIOD_IN_MS_FIELD_DESC); + oprot.writeI32(this.memtable_flush_period_in_ms); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -3401,6 +3485,12 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav sb.append(this.dclocal_read_repair_chance); first = false; } + if (isSetMemtable_flush_period_in_ms()) { + if (!first) sb.append(", "); + sb.append("memtable_flush_period_in_ms:"); + sb.append(this.memtable_flush_period_in_ms); + first = false; + } if (isSetRow_cache_size()) { if (!first) sb.append(", "); sb.append("row_cache_size:"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/cli/CliClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cli/CliClient.java b/src/java/org/apache/cassandra/cli/CliClient.java index a4b4483..197a870 100644 --- a/src/java/org/apache/cassandra/cli/CliClient.java +++ b/src/java/org/apache/cassandra/cli/CliClient.java @@ -137,6 +137,7 @@ public class CliClient COMPACTION_STRATEGY_OPTIONS, COMPRESSION_OPTIONS, BLOOM_FILTER_FP_CHANCE, + MEMTABLE_FLUSH_PERIOD_IN_MS, CACHING } @@ -1323,6 +1324,9 @@ public class CliClient case BLOOM_FILTER_FP_CHANCE: cfDef.setBloom_filter_fp_chance(Double.parseDouble(mValue)); break; + case MEMTABLE_FLUSH_PERIOD_IN_MS: + cfDef.setMemtable_flush_period_in_ms(Integer.parseInt(mValue)); + break; case CACHING: cfDef.setCaching(CliUtils.unescapeSQLString(mValue)); break; http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index b50abc5..b2b3a3c 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -123,6 +123,7 @@ public final class CFMetaData + "key_validator text," + "min_compaction_threshold int," + "max_compaction_threshold int," + + "memtable_flush_period_in_ms int," + "key_alias text," // that one is kept for compatibility sake + "key_aliases text," + "bloom_filter_fp_chance double," @@ -258,6 +259,7 @@ public final class CFMetaData private volatile ByteBuffer valueAlias = null; private volatile Double bloomFilterFpChance = null; private volatile Caching caching = DEFAULT_CACHING_STRATEGY; + private int memtableFlushPeriod = 0; volatile Map<ByteBuffer, ColumnDefinition> column_metadata = new HashMap<ByteBuffer,ColumnDefinition>(); public volatile Class<? extends AbstractCompactionStrategy> compactionStrategyClass = DEFAULT_COMPACTION_STRATEGY_CLASS; @@ -288,6 +290,7 @@ public final class CFMetaData public CFMetaData compressionParameters(CompressionParameters prop) {compressionParameters = prop; return this;} public CFMetaData bloomFilterFpChance(Double prop) {bloomFilterFpChance = prop; return this;} public CFMetaData caching(Caching prop) {caching = prop; return this;} + public CFMetaData memtableFlushPeriod(int prop) {memtableFlushPeriod = prop; return this;} public CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType<?> comp, AbstractType<?> subcc) { @@ -426,7 +429,8 @@ public final class CFMetaData .compactionStrategyOptions(oldCFMD.compactionStrategyOptions) .compressionParameters(oldCFMD.compressionParameters) .bloomFilterFpChance(oldCFMD.bloomFilterFpChance) - .caching(oldCFMD.caching); + .caching(oldCFMD.caching) + .memtableFlushPeriod(oldCFMD.memtableFlushPeriod); } /** @@ -539,6 +543,11 @@ public final class CFMetaData return caching; } + public int getMemtableFlushPeriod() + { + return memtableFlushPeriod; + } + public boolean equals(Object obj) { if (obj == this) @@ -575,6 +584,7 @@ public final class CFMetaData .append(compactionStrategyOptions, rhs.compactionStrategyOptions) .append(compressionParameters, rhs.compressionParameters) .append(bloomFilterFpChance, rhs.bloomFilterFpChance) + .append(memtableFlushPeriod, rhs.memtableFlushPeriod) .append(caching, rhs.caching) .isEquals(); } @@ -605,6 +615,7 @@ public final class CFMetaData .append(compactionStrategyOptions) .append(compressionParameters) .append(bloomFilterFpChance) + .append(memtableFlushPeriod) .append(caching) .toHashCode(); } @@ -677,6 +688,8 @@ public final class CFMetaData newCFMD.compactionStrategyOptions(new HashMap<String, String>(cf_def.compaction_strategy_options)); if (cf_def.isSetBloom_filter_fp_chance()) newCFMD.bloomFilterFpChance(cf_def.bloom_filter_fp_chance); + if (cf_def.isSetMemtable_flush_period_in_ms()) + newCFMD.memtableFlushPeriod(cf_def.memtable_flush_period_in_ms); if (cf_def.isSetCaching()) newCFMD.caching(Caching.fromString(cf_def.caching)); if (cf_def.isSetRead_repair_chance()) @@ -786,6 +799,7 @@ public final class CFMetaData valueAlias = cfm.valueAlias; bloomFilterFpChance = cfm.bloomFilterFpChance; + memtableFlushPeriod = cfm.memtableFlushPeriod; caching = cfm.caching; MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(column_metadata, cfm.column_metadata); @@ -879,6 +893,7 @@ public final class CFMetaData def.setCompression_options(compressionParameters.asThriftOptions()); if (bloomFilterFpChance != null) def.setBloom_filter_fp_chance(bloomFilterFpChance); + def.setMemtable_flush_period_in_ms(memtableFlushPeriod); def.setCaching(caching.toString()); return def; } @@ -1215,6 +1230,7 @@ public final class CFMetaData cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "key_validator")); cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "min_compaction_threshold")); cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "max_compaction_threshold")); + cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "memtable_flush_period_in_ms")); cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "key_alias")); cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "key_aliases")); cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "bloom_filter_fp_chance")); @@ -1268,6 +1284,7 @@ public final class CFMetaData cf.addColumn(Column.create(json(aliasesAsStrings(keyAliases)), timestamp, cfName, "key_aliases")); cf.addColumn(bloomFilterFpChance == null ? DeletedColumn.create(ldt, timestamp, cfName, "bloomFilterFpChance") : Column.create(bloomFilterFpChance, timestamp, cfName, "bloom_filter_fp_chance")); + cf.addColumn(Column.create(memtableFlushPeriod, timestamp, cfName, "memtable_flush_period_in_ms")); cf.addColumn(Column.create(caching.toString(), timestamp, cfName, "caching")); cf.addColumn(Column.create(compactionStrategyClass.getName(), timestamp, cfName, "compaction_strategy_class")); cf.addColumn(Column.create(json(compressionParameters.asThriftOptions()), timestamp, cfName, "compression_parameters")); @@ -1312,6 +1329,8 @@ public final class CFMetaData } if (result.has("bloom_filter_fp_chance")) cfm.bloomFilterFpChance(result.getDouble("bloom_filter_fp_chance")); + if (result.has("memtable_flush_period_in_ms")) + cfm.memtableFlushPeriod(result.getInt("memtable_flush_period_in_ms")); cfm.caching(Caching.valueOf(result.getString("caching"))); cfm.compactionStrategyClass(createCompactionStrategy(result.getString("compaction_strategy_class"))); cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters")))); @@ -1481,6 +1500,7 @@ public final class CFMetaData .append("compactionStrategyOptions", compactionStrategyOptions) .append("compressionOptions", compressionParameters.asThriftOptions()) .append("bloomFilterFpChance", bloomFilterFpChance) + .append("memtable_flush_period_in_ms", memtableFlushPeriod) .append("caching", caching) .toString(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/cql/AlterTableStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java index 5210f25..7680a6b 100644 --- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java +++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java @@ -186,6 +186,7 @@ public class AlterTableStatement cfm.maxCompactionThreshold(cfProps.getPropertyInt(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, cfm.getMaxCompactionThreshold())); cfm.caching(CFMetaData.Caching.fromString(cfProps.getPropertyString(CFPropDefs.KW_CACHING, cfm.getCaching().toString()))); cfm.bloomFilterFpChance(cfProps.getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance())); + cfm.memtableFlushPeriod(cfProps.getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod())); if (!cfProps.compactionStrategyOptions.isEmpty()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/cql/CFPropDefs.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/CFPropDefs.java b/src/java/org/apache/cassandra/cql/CFPropDefs.java index d50f2d0..091ee6a 100644 --- a/src/java/org/apache/cassandra/cql/CFPropDefs.java +++ b/src/java/org/apache/cassandra/cql/CFPropDefs.java @@ -50,6 +50,7 @@ public class CFPropDefs { public static final String KW_COMPACTION_STRATEGY_CLASS = "compaction_strategy_class"; public static final String KW_CACHING = "caching"; public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance"; + public static final String KW_MEMTABLE_FLUSH_PERIOD = "memtable_flush_period_in_ms"; // Maps CQL short names to the respective Cassandra comparator/validator class names public static final Map<String, String> comparators = new HashMap<String, String>(); @@ -91,6 +92,7 @@ public class CFPropDefs { keywords.add(KW_COMPACTION_STRATEGY_CLASS); keywords.add(KW_CACHING); keywords.add(KW_BF_FP_CHANCE); + keywords.add(KW_MEMTABLE_FLUSH_PERIOD); obsoleteKeywords.add("row_cache_size"); obsoleteKeywords.add("key_cache_size"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java index fed856f..992166e 100644 --- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java +++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java @@ -192,7 +192,8 @@ public class CreateColumnFamilyStatement .compactionStrategyOptions(cfProps.compactionStrategyOptions) .compressionParameters(CompressionParameters.create(cfProps.compressionParameters)) .caching(CFMetaData.Caching.fromString(getPropertyString(CFPropDefs.KW_CACHING, CFMetaData.DEFAULT_CACHING_STRATEGY.toString()))) - .bloomFilterFpChance(getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, null)); + .bloomFilterFpChance(getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, null)) + .memtableFlushPeriod(getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, 0)); // CQL2 can have null keyAliases if (keyAlias != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/cql3/CFPropDefs.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/CFPropDefs.java index 0b563cc..61cefff 100644 --- a/src/java/org/apache/cassandra/cql3/CFPropDefs.java +++ b/src/java/org/apache/cassandra/cql3/CFPropDefs.java @@ -17,20 +17,16 @@ */ package org.apache.cassandra.cql3; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.io.compress.CompressionParameters; public class CFPropDefs extends PropertyDefinitions @@ -46,6 +42,7 @@ public class CFPropDefs extends PropertyDefinitions public static final String KW_REPLICATEONWRITE = "replicate_on_write"; public static final String KW_CACHING = "caching"; public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance"; + public static final String KW_MEMTABLE_FLUSH_PERIOD = "memtable_flush_period_in_ms"; public static final String KW_COMPACTION = "compaction"; public static final String KW_COMPRESSION = "compression"; @@ -66,6 +63,7 @@ public class CFPropDefs extends PropertyDefinitions keywords.add(KW_BF_FP_CHANCE); keywords.add(KW_COMPACTION); keywords.add(KW_COMPRESSION); + keywords.add(KW_MEMTABLE_FLUSH_PERIOD); obsoleteKeywords.add("compaction_strategy_class"); obsoleteKeywords.add("compaction_strategy_options"); @@ -128,6 +126,7 @@ public class CFPropDefs extends PropertyDefinitions cfm.maxCompactionThreshold(toInt(KW_MAXCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MAXCOMPACTIONTHRESHOLD), cfm.getMaxCompactionThreshold())); cfm.caching(CFMetaData.Caching.fromString(getString(KW_CACHING, cfm.getCaching().toString()))); cfm.bloomFilterFpChance(getDouble(KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance())); + cfm.memtableFlushPeriod(getInt(KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod())); if (compactionStrategyClass != null) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 439ef5f..434a5c4 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -33,10 +33,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; - -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.tracing.TraceState; -import org.apache.cassandra.tracing.Tracing; import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +52,7 @@ import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.ExtendedFilter; +import org.apache.cassandra.db.filter.IDiskAtomFilter; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.index.SecondaryIndex; @@ -154,6 +151,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean maybeReloadCompactionStrategy(); + scheduleFlush(); + indexManager.reload(); // If the CF comparator has changed, we need to change the memtable, @@ -206,6 +205,30 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } + void scheduleFlush() + { + int period = metadata.getMemtableFlushPeriod(); + if (period > 0) + { + logger.debug("scheduling flush in {} ms", period); + WrappedRunnable runnable = new WrappedRunnable() + { + protected void runMayThrow() throws Exception + { + if (getMemtableThreadSafe().isExpired()) + { + Future<?> future = forceFlush(); + // if memtable is already expired but didn't flush because it's empty, + // then schedule another flush. + if (future == null) + scheduleFlush(); + } + } + }; + StorageService.scheduledTasks.schedule(runnable, period, TimeUnit.MILLISECONDS); + } + } + public void setCompactionStrategyClass(String compactionStrategyClass) throws ConfigurationException { metadata.compactionStrategyClass = CFMetaData.createCompactionStrategy(compactionStrategyClass); http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 82d22ca..bbbe272 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -112,6 +112,7 @@ public class Memtable this.cfs = cfs; this.creationTime = System.currentTimeMillis(); this.initialComparator = cfs.metadata.comparator; + this.cfs.scheduleFlush(); Callable<Set<Object>> provider = new Callable<Set<Object>>() { @@ -313,6 +314,15 @@ public class Memtable } /** + * @return true if this memtable is expired. Expiration time is determined by CF's memtable_flush_period_in_ms. + */ + public boolean isExpired() + { + int period = cfs.metadata.getMemtableFlushPeriod(); + return period > 0 && (System.currentTimeMillis() >= creationTime + period); + } + + /** * obtain an iterator of columns in this memtable in the specified order starting from a given column. */ public static OnDiskAtomIterator getSliceIterator(final DecoratedKey key, final ColumnFamily cf, SliceQueryFilter filter)
